From bc37efa6dc48507899e5e7713ac3d4712bb03ea2 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 21 Feb 2009 18:40:05 +0000 Subject: [PATCH] - ofd (obdfilter over OSD) is added --- lustre/include/linux/obd_class.h | 1 + lustre/obdclass/linux/linux-obdo.c | 45 ++ lustre/ofd/Makefile.in | 7 + lustre/ofd/autoMakefile.am | 21 + lustre/ofd/lproc_ofd.c | 517 ++++++++++++++++++ lustre/ofd/ofd_capa.c | 210 +++++++ lustre/ofd/ofd_clients.c | 221 ++++++++ lustre/ofd/ofd_dev.c | 879 ++++++++++++++++++++++++++++++ lustre/ofd/ofd_fmd.c | 231 ++++++++ lustre/ofd/ofd_fs.c | 557 +++++++++++++++++++ lustre/ofd/ofd_grant.c | 515 ++++++++++++++++++ lustre/ofd/ofd_internal.h | 533 ++++++++++++++++++ lustre/ofd/ofd_io.c | 378 +++++++++++++ lustre/ofd/ofd_llog.c | 374 +++++++++++++ lustre/ofd/ofd_lvb.c | 241 ++++++++ lustre/ofd/ofd_obd.c | 1057 ++++++++++++++++++++++++++++++++++++ lustre/ofd/ofd_objects.c | 236 ++++++++ lustre/ofd/ofd_recovery.c | 337 ++++++++++++ 18 files changed, 6360 insertions(+) create mode 100644 lustre/ofd/Makefile.in create mode 100644 lustre/ofd/autoMakefile.am create mode 100644 lustre/ofd/lproc_ofd.c create mode 100644 lustre/ofd/ofd_capa.c create mode 100644 lustre/ofd/ofd_clients.c create mode 100644 lustre/ofd/ofd_dev.c create mode 100644 lustre/ofd/ofd_fmd.c create mode 100644 lustre/ofd/ofd_fs.c create mode 100644 lustre/ofd/ofd_grant.c create mode 100644 lustre/ofd/ofd_internal.h create mode 100644 lustre/ofd/ofd_io.c create mode 100644 lustre/ofd/ofd_llog.c create mode 100644 lustre/ofd/ofd_lvb.c create mode 100644 lustre/ofd/ofd_obd.c create mode 100644 lustre/ofd/ofd_objects.c create mode 100644 lustre/ofd/ofd_recovery.c diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index b891491..103aeb2 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -83,6 +83,7 @@ int ldiskfsfs_dirhash(const char *name, int len, struct ldiskfs_dx_hash_info *hi #ifdef __KERNEL__ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid); void obdo_from_la(struct obdo *dst, struct lu_attr *la, obd_flag valid); +void la_from_obdo(struct lu_attr *la, struct obdo *dst, obd_flag valid); void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid); void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid); #endif diff --git a/lustre/obdclass/linux/linux-obdo.c b/lustre/obdclass/linux/linux-obdo.c index e85b5ab..3dbc476 100644 --- a/lustre/obdclass/linux/linux-obdo.c +++ b/lustre/obdclass/linux/linux-obdo.c @@ -170,6 +170,51 @@ void obdo_from_la(struct obdo *dst, struct lu_attr *la, obd_flag valid) } EXPORT_SYMBOL(obdo_from_la); +/*FIXME: Just copy from obdo_from_inode*/ +void la_from_obdo(struct lu_attr *dst, struct obdo *obdo, obd_flag valid) +{ + __u64 newvalid = 0; + + if (valid & OBD_MD_FLATIME) { + dst->la_atime = obdo->o_atime; + newvalid |= LA_ATIME; + } + if (valid & OBD_MD_FLMTIME) { + dst->la_mtime = obdo->o_mtime; + newvalid |= LA_MTIME; + } + if (valid & OBD_MD_FLCTIME) { + dst->la_ctime = obdo->o_ctime; + newvalid |= LA_CTIME; + } + if (valid & OBD_MD_FLSIZE) { + dst->la_size = obdo->o_size; + newvalid |= LA_SIZE; + } + if (valid & OBD_MD_FLBLOCKS) { /* allocation of space (x512 bytes) */ + dst->la_blocks = obdo->o_blocks; + newvalid |= LA_BLOCKS; + } + if (valid & OBD_MD_FLTYPE) { + dst->la_mode = (dst->la_mode & S_IALLUGO)|(obdo->o_mode & S_IFMT); + newvalid |= LA_TYPE; + } + if (valid & OBD_MD_FLMODE) { + dst->la_mode = (dst->la_mode & S_IFMT)|(obdo->o_mode & S_IALLUGO); + newvalid |= LA_MODE; + } + if (valid & OBD_MD_FLUID) { + dst->la_uid = obdo->o_uid; + newvalid |= LA_UID; + } + if (valid & OBD_MD_FLGID) { + dst->la_gid = obdo->o_gid; + newvalid |= LA_GID; + } + dst->la_valid |= newvalid; +} +EXPORT_SYMBOL(la_from_obdo); + void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid) { valid &= src->o_valid; diff --git a/lustre/ofd/Makefile.in b/lustre/ofd/Makefile.in new file mode 100644 index 0000000..a00fabe --- /dev/null +++ b/lustre/ofd/Makefile.in @@ -0,0 +1,7 @@ +MODULES := ofd + +ofd-objs := ofd_dev.o ofd_io.o ofd_llog.o ofd_obd.o ofd_objects.o +ofd-objs += lproc_ofd.o ofd_lvb.o ofd_capa.o ofd_clients.o +ofd-objs += ofd_grant.o ofd_recovery.o ofd_fs.o ofd_fmd.o + +@INCLUDE_RULES@ diff --git a/lustre/ofd/autoMakefile.am b/lustre/ofd/autoMakefile.am new file mode 100644 index 0000000..20ff1559 --- /dev/null +++ b/lustre/ofd/autoMakefile.am @@ -0,0 +1,21 @@ +# Copyright (C) 2001 Cluster File Systems, Inc. +# +# This code is issued under the GNU General Public License. +# See the file COPYING in this distribution + +if LUSTRE_UOSS +noinst_LIBRARIES = libofd.a +libofd_a_SOURCES = ofd_clients.c ofd_recovery.c ofd_fs.c \ + ofd_objects.c ofd_llog.c ofd.c ofd_capa.c \ + ofd_lvb.c ofd_io.c ofd_obd.c ofd_grant.c ofd_fmd.c + +libofd_a_CPPFLAGS = $(LLCPPFLAGS) -DLUSTRE_VERSION=\"32\" -DBUILD_VERSION=\"1\" +libofd_a_CFLAGS = $(LLCFLAGS) +endif + +if MODULES +modulefs_DATA = ofd$(KMODEXT) +endif + +MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ +DIST_SOURCES = $(ofd-objs:%.o=%.c) ofd_internal.h diff --git a/lustre/ofd/lproc_ofd.c b/lustre/ofd/lproc_ofd.c new file mode 100644 index 0000000..4c29e43 --- /dev/null +++ b/lustre/ofd/lproc_ofd.c @@ -0,0 +1,517 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + * + */ +#define DEBUG_SUBSYSTEM S_CLASS + +#include +#include +#include +#include +#include + +#include "ofd_internal.h" + +#ifdef LPROCFS +static int lprocfs_filter_rd_groups(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + *eof = 1; + return snprintf(page, count, "%u\n", obd->u.filter.fo_group_count); +} + +static int lprocfs_filter_rd_tot_dirty(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + + LASSERT(obd != NULL); + *eof = 1; + return snprintf(page, count, LPU64"\n", obd->u.filter.fo_tot_dirty); +} + +static int lprocfs_filter_rd_tot_granted(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + + LASSERT(obd != NULL); + *eof = 1; + return snprintf(page, count, LPU64"\n", obd->u.filter.fo_tot_granted); +} + +static int lprocfs_filter_rd_tot_pending(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + + LASSERT(obd != NULL); + *eof = 1; + return snprintf(page, count, LPU64"\n", obd->u.filter.fo_tot_pending); +} + +static int lprocfs_filter_rd_mntdev(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ +#if 0 + struct obd_device *obd = (struct obd_device *)data; + + LASSERT(obd != NULL); + LASSERT(obd->u.filter.fo_vfsmnt->mnt_devname); + *eof = 1; + return snprintf(page, count, "%s\n", + obd->u.filter.fo_vfsmnt->mnt_devname); +#endif + *eof = 1; + return 0; +} + +static int lprocfs_filter_rd_last_id(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct filter_obd *filter = &obd->u.filter; + int retval = 0, rc, i; + + if (obd == NULL) + return 0; + + for (i = FILTER_GROUP_MDS0; i < filter->fo_group_count; i++) { + rc = snprintf(page, count, LPU64"\n", filter_last_id(ofd, i)); + if (rc < 0) { + retval = rc; + break; + } + page += rc; + count -= rc; + retval += rc; + } + return retval; +} + +int lprocfs_filter_rd_readcache(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, LPU64"\n", + obd->u.filter.fo_readcache_max_filesize); + return rc; +} + +int lprocfs_filter_wr_readcache(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + __u64 val; + int rc; + + rc = lprocfs_write_u64_helper(buffer, count, &val); + if (rc) + return rc; + + obd->u.filter.fo_readcache_max_filesize = val; + return count; +} + + +int lprocfs_filter_rd_fmd_max_num(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, "%u\n", obd->u.filter.fo_fmd_max_num); + return rc; +} + +int lprocfs_filter_wr_fmd_max_num(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val > 65536 || val < 1) + return -EINVAL; + + obd->u.filter.fo_fmd_max_num = val; + return count; +} + +int lprocfs_filter_rd_fmd_max_age(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, "%u\n", obd->u.filter.fo_fmd_max_age / HZ); + return rc; +} + +int lprocfs_filter_wr_fmd_max_age(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val > 65536 || val < 1) + return -EINVAL; + + obd->u.filter.fo_fmd_max_age = val * HZ; + return count; +} + +static int lprocfs_filter_rd_capa(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, "capability on: %s\n", + obd->u.filter.fo_fl_oss_capa ? "oss" : ""); + return rc; +} + +static int lprocfs_filter_wr_capa(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val & ~0x1) { + CERROR("invalid capability mode, only 0/1 are accepted.\n" + " 1: enable oss fid capability\n" + " 0: disable oss fid capability\n"); + return -EINVAL; + } + + obd->u.filter.fo_fl_oss_capa = val; + LCONSOLE_INFO("OSS %s %s fid capability.\n", obd->obd_name, + val ? "enabled" : "disabled"); + return count; +} + +static int lprocfs_filter_rd_capa_count(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + return snprintf(page, count, "%d %d\n", + capa_count[CAPA_SITE_CLIENT], + capa_count[CAPA_SITE_SERVER]); +} + +static struct lprocfs_vars lprocfs_filter_obd_vars[] = { + { "uuid", lprocfs_rd_uuid, 0, 0 }, + { "blocksize", lprocfs_rd_blksize, 0, 0 }, + { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, + { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, + { "filestotal", lprocfs_rd_filestotal, 0, 0 }, + { "filesfree", lprocfs_rd_filesfree, 0, 0 }, + { "filegroups", lprocfs_filter_rd_groups, 0, 0 }, + { "fstype", lprocfs_rd_fstype, 0, 0 }, + { "mntdev", lprocfs_filter_rd_mntdev, 0, 0 }, + { "last_id", lprocfs_filter_rd_last_id,0, 0 }, + { "tot_dirty", lprocfs_filter_rd_tot_dirty, 0, 0 }, + { "tot_pending", lprocfs_filter_rd_tot_pending, 0, 0 }, + { "tot_granted", lprocfs_filter_rd_tot_granted, 0, 0 }, + { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 }, + { "recovery_maxtime", lprocfs_obd_rd_recovery_maxtime, + lprocfs_obd_wr_recovery_maxtime, 0}, + { "evict_client", 0, lprocfs_wr_evict_client, 0, + &lprocfs_evict_client_fops}, + { "num_exports", lprocfs_rd_num_exports, 0, 0 }, + { "readcache_max_filesize", + lprocfs_filter_rd_readcache, + lprocfs_filter_wr_readcache, 0 }, +#if 0 +#ifdef HAVE_QUOTA_SUPPORT + { "quota_bunit_sz", lprocfs_rd_bunit, lprocfs_wr_bunit, 0}, + { "quota_btune_sz", lprocfs_rd_btune, lprocfs_wr_btune, 0}, + { "quota_iunit_sz", lprocfs_rd_iunit, lprocfs_wr_iunit, 0}, + { "quota_itune_sz", lprocfs_rd_itune, lprocfs_wr_itune, 0}, + { "quota_type", lprocfs_rd_type, lprocfs_wr_type, 0}, +#endif +#endif + { "client_cache_count", lprocfs_filter_rd_fmd_max_num, + lprocfs_filter_wr_fmd_max_num, 0 }, + { "client_cache_seconds", lprocfs_filter_rd_fmd_max_age, + lprocfs_filter_wr_fmd_max_age, 0 }, + { "capa", lprocfs_filter_rd_capa, + lprocfs_filter_wr_capa, 0 }, + { "capa_count", lprocfs_filter_rd_capa_count, 0, 0 }, + { 0 } +}; + +static struct lprocfs_vars lprocfs_filter_module_vars[] = { + { "num_refs", lprocfs_rd_numrefs, 0, 0 }, + { 0 } +}; + +void filter_tally(struct obd_export *exp, struct page **pages, int nr_pages, + unsigned long *blocks, int blocks_per_page, int wr) +{ +#if 0 + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct filter_export_data *fed = &exp->exp_filter_data; + struct page *last_page = NULL; + unsigned long *last_block = NULL; + unsigned long discont_pages = 0; + unsigned long discont_blocks = 0; + int i; + + if (nr_pages == 0) + return; + + lprocfs_oh_tally_log2(&filter->fo_filter_stats.hist[BRW_R_PAGES + wr], + nr_pages); + lprocfs_oh_tally_log2(&fed->fed_brw_stats.hist[BRW_R_PAGES + wr], + nr_pages); + + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_brw_stats) + lprocfs_oh_tally_log2(&exp->exp_nid_stats->nid_brw_stats->hist[BRW_W_PAGES + wr], + nr_pages); + while (nr_pages-- > 0) { + if (last_page && (*pages)->index != (last_page->index + 1)) + discont_pages++; + last_page = *pages; + pages++; + for (i = 0; i < blocks_per_page; i++) { + if (last_block && *blocks != (*last_block + 1)) + discont_blocks++; + last_block = blocks++; + } + } + + lprocfs_oh_tally(&filter->fo_filter_stats.hist[BRW_R_DISCONT_PAGES +wr], + discont_pages); + lprocfs_oh_tally(&fed->fed_brw_stats.hist[BRW_R_DISCONT_PAGES + wr], + discont_pages); + lprocfs_oh_tally(&filter->fo_filter_stats.hist[BRW_R_DISCONT_BLOCKS+wr], + discont_blocks); + lprocfs_oh_tally(&fed->fed_brw_stats.hist[BRW_R_DISCONT_BLOCKS + wr], + discont_blocks); + + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_brw_stats) { + lprocfs_oh_tally_log2(&exp->exp_nid_stats->nid_brw_stats->hist[BRW_W_DISCONT_PAGES + wr], + discont_pages); + lprocfs_oh_tally_log2(&exp->exp_nid_stats->nid_brw_stats->hist[BRW_W_DISCONT_BLOCKS + wr], + discont_blocks); + } +#endif +} + +#define pct(a,b) (b ? a * 100 / b : 0) + +static void display_brw_stats(struct seq_file *seq, char *name, char *units, + struct obd_histogram *read, struct obd_histogram *write, int log2) +{ + unsigned long read_tot, write_tot, r, w, read_cum = 0, write_cum = 0; + int i; + + seq_printf(seq, "\n%26s read | write\n", " "); + seq_printf(seq, "%-22s %-5s %% cum %% | %-5s %% cum %%\n", + name, units, units); + + read_tot = lprocfs_oh_sum(read); + write_tot = lprocfs_oh_sum(write); + for (i = 0; i < OBD_HIST_MAX; i++) { + r = read->oh_buckets[i]; + w = write->oh_buckets[i]; + read_cum += r; + write_cum += w; + if (read_cum == 0 && write_cum == 0) + continue; + + if (!log2) + seq_printf(seq, "%u", i); + else if (i < 10) + seq_printf(seq, "%u", 1<hist[BRW_R_PAGES], + &brw_stats->hist[BRW_W_PAGES], 1); + + display_brw_stats(seq, "discontiguous pages", "rpcs", + &brw_stats->hist[BRW_R_DISCONT_PAGES], + &brw_stats->hist[BRW_W_DISCONT_PAGES], 0); + + display_brw_stats(seq, "discontiguous blocks", "rpcs", + &brw_stats->hist[BRW_R_DISCONT_BLOCKS], + &brw_stats->hist[BRW_W_DISCONT_BLOCKS], 0); + + display_brw_stats(seq, "disk fragmented I/Os", "ios", + &brw_stats->hist[BRW_R_DIO_FRAGS], + &brw_stats->hist[BRW_W_DIO_FRAGS], 0); + + display_brw_stats(seq, "disk I/Os in flight", "ios", + &brw_stats->hist[BRW_R_RPC_HIST], + &brw_stats->hist[BRW_W_RPC_HIST], 0); + + { + char title[24]; + sprintf(title, "I/O time (1/%ds)", HZ); + display_brw_stats(seq, title, "ios", + &brw_stats->hist[BRW_R_IO_TIME], + &brw_stats->hist[BRW_W_IO_TIME], 1); + } + + display_brw_stats(seq, "disk I/O size", "ios", + &brw_stats->hist[BRW_R_DISK_IOSIZE], + &brw_stats->hist[BRW_W_DISK_IOSIZE], 1); +} + +#undef pct + +static int filter_brw_stats_seq_show(struct seq_file *seq, void *v) +{ + struct obd_device *dev = seq->private; + struct filter_obd *filter = &dev->u.filter; + + brw_stats_show(seq, &filter->fo_filter_stats); + + return 0; +} + +static ssize_t filter_brw_stats_seq_write(struct file *file, const char *buf, + size_t len, loff_t *off) +{ + struct seq_file *seq = file->private_data; + struct obd_device *dev = seq->private; + struct filter_obd *filter = &dev->u.filter; + int i; + + for (i = 0; i < BRW_LAST; i++) + lprocfs_oh_clear(&filter->fo_filter_stats.hist[i]); + + return len; +} + +LPROC_SEQ_FOPS(filter_brw_stats); + +int lproc_filter_attach_seqstat(struct obd_device *dev) +{ + return lprocfs_obd_seq_create(dev, "brw_stats", 0444, + &filter_brw_stats_fops, dev); +} + +static int filter_per_export_stats_seq_show(struct seq_file *seq, void *v) +{ +#if 0 + struct filter_export_data *fed = seq->private; + + brw_stats_show(seq, &fed->fed_brw_stats); +#endif + + return 0; +} + +static ssize_t filter_per_export_stats_seq_write(struct file *file, + const char *buf, size_t len, loff_t *off) +{ +#if 0 + struct seq_file *seq = file->private_data; + struct filter_export_data *fed = seq->private; + int i; + + for (i = 0; i < BRW_LAST; i++) + lprocfs_oh_clear(&fed->fed_brw_stats.hist[i]); + + return len; +#endif + return 0; +} + +LPROC_SEQ_FOPS(filter_per_export_stats); + +void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars) +{ + lvars->module_vars = lprocfs_filter_module_vars; + lvars->obd_vars = lprocfs_filter_obd_vars; +} + +static int filter_per_nid_stats_seq_show(struct seq_file *seq, void *v) +{ + nid_stat_t *tmp = seq->private; + + if (tmp->nid_brw_stats) + brw_stats_show(seq, tmp->nid_brw_stats); + + return 0; +} + +static ssize_t filter_per_nid_stats_seq_write(struct file *file, + const char *buf, size_t len, + loff_t *off) +{ + struct seq_file *seq = file->private_data; + nid_stat_t *tmp = seq->private; + int i; + + if (tmp->nid_brw_stats) + for (i = 0; i < BRW_LAST; i++) + lprocfs_oh_clear(&tmp->nid_brw_stats->hist[i]); + + return len; +} + +LPROC_SEQ_FOPS(filter_per_nid_stats); +#endif /* LPROCFS */ diff --git a/lustre/ofd/ofd_capa.c b/lustre/ofd/ofd_capa.c new file mode 100644 index 0000000..23d6b52 --- /dev/null +++ b/lustre/ofd/ofd_capa.c @@ -0,0 +1,210 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2005 Cluster File Systems, Inc. + * + * Author: Lai Siyao + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#ifdef __KERNEL__ +#include +#include +#include +#include +#include +#endif + +#include +#include +#include "ofd_internal.h" + +static inline __u32 filter_ck_keyid(struct filter_capa_key *key) +{ + return key->k_key.lk_keyid; +} + +int filter_update_capa_key(struct filter_device *ofd, struct lustre_capa_key *new) +{ + struct filter_capa_key *k, *keys[2] = { NULL, NULL }; + int i; + + spin_lock(&capa_lock); + list_for_each_entry(k, &ofd->ofd_capa_keys, k_list) { + if (k->k_key.lk_mdsid != new->lk_mdsid) + continue; + + if (keys[0]) { + keys[1] = k; + if (filter_ck_keyid(keys[1]) > filter_ck_keyid(keys[0])) + keys[1] = keys[0], keys[0] = k; + } else { + keys[0] = k; + } + } + spin_unlock(&capa_lock); + + for (i = 0; i < 2; i++) { + if (!keys[i]) + continue; + if (filter_ck_keyid(keys[i]) != new->lk_keyid) + continue; + /* maybe because of recovery or other reasons, MDS sent the + * the old capability key again. + */ + spin_lock(&capa_lock); + keys[i]->k_key = *new; + spin_unlock(&capa_lock); + + RETURN(0); + } + + if (keys[1]) { + /* if OSS already have two keys, update the old one */ + k = keys[1]; + } else { + OBD_ALLOC_PTR(k); + if (!k) + RETURN(-ENOMEM); + CFS_INIT_LIST_HEAD(&k->k_list); + } + + spin_lock(&capa_lock); + k->k_key = *new; + if (list_empty(&k->k_list)) + list_add(&k->k_list, &ofd->ofd_capa_keys); + spin_unlock(&capa_lock); + + DEBUG_CAPA_KEY(D_SEC, new, "new"); + RETURN(0); +} + +int filter_auth_capa(struct filter_device *ofd, struct lu_fid *fid, + __u64 mdsid, struct lustre_capa *capa, __u64 opc) +{ +#if 0 + struct filter_capa_key *k; + struct lustre_capa_key key; + struct obd_capa *oc; + __u8 *hmac; + int keys_ready = 0, key_found = 0, rc = 0; + ENTRY; + + /* capability is disabled */ + if (!ofd->ofd_fl_oss_capa) + RETURN(0); + + if (capa == NULL) { + if (fid) + CERROR("mdsno/fid/opc "LPU64"/"DFID"/"LPX64 + ": no capability has been passed\n", + mdsid, PFID(fid), opc); + else + CERROR("mdsno/opc "LPU64"/"LPX64 + ": no capability has been passed\n", + mdsid, opc); + RETURN(-EACCES); + } + +#warning "enable fid check in filter_auth_capa() when fid stored in OSS object" + + if (opc == CAPA_OPC_OSS_READ) { + if (!(capa->lc_opc & CAPA_OPC_OSS_RW)) + rc = -EACCES; + } else if (!capa_opc_supported(capa, opc)) { + rc = -EACCES; + } + if (rc) { + DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc); + RETURN(rc); + } + + oc = capa_lookup(ofd->ofd_capa_hash, capa, 0); + if (oc) { + spin_lock(&oc->c_lock); + if (capa_is_expired(oc)) { + DEBUG_CAPA(D_ERROR, capa, "expired"); + rc = -ESTALE; + } + spin_unlock(&oc->c_lock); + + capa_put(oc); + RETURN(rc); + } + + spin_lock(&capa_lock); + list_for_each_entry(k, &ofd->ofd_capa_keys, k_list) + if (k->k_key.lk_mdsid == mdsid) { + keys_ready = 1; + if (k->k_key.lk_keyid == capa_keyid(capa)) { + key = k->k_key; + key_found = 1; + break; + } + } + spin_unlock(&capa_lock); + + if (!keys_ready) { + CDEBUG(D_SEC, "MDS hasn't propagated capability keys yet, " + "ignore check!\n"); + RETURN(0); + } + + if (!key_found) { + DEBUG_CAPA(D_ERROR, capa, "no matched capability key for"); + RETURN(-ESTALE); + } + + OBD_ALLOC(hmac, CAPA_HMAC_MAX_LEN); + if (hmac == NULL) + RETURN(-ENOMEM); + + rc = capa_hmac(hmac, capa, key.lk_key); + if (rc) { + DEBUG_CAPA(D_ERROR, capa, "HMAC failed: rc %d", rc); + OBD_FREE(hmac, CAPA_HMAC_MAX_LEN); + RETURN(rc); + } + + rc = memcmp(hmac, capa->lc_hmac, CAPA_HMAC_MAX_LEN); + OBD_FREE(hmac, CAPA_HMAC_MAX_LEN); + if (rc) { + DEBUG_CAPA_KEY(D_ERROR, &key, "calculate HMAC with "); + DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); + RETURN(-EACCES); + } + + /* store in capa hash */ + oc = capa_add(ofd->ofd_capa_hash, capa); + capa_put(oc); +#endif + RETURN(0); +} + +void filter_free_capa_keys(struct filter_device *ofd) +{ + struct filter_capa_key *key, *n; + + spin_lock(&capa_lock); + list_for_each_entry_safe(key, n, &ofd->ofd_capa_keys, k_list) { + list_del_init(&key->k_list); + OBD_FREE(key, sizeof(*key)); + } + spin_unlock(&capa_lock); +} diff --git a/lustre/ofd/ofd_clients.c b/lustre/ofd/ofd_clients.c new file mode 100644 index 0000000..d0717a1 --- /dev/null +++ b/lustre/ofd/ofd_clients.c @@ -0,0 +1,221 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_clients.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Alex Tomas + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +/* Add client data to the FILTER. We use a bitmap to locate a free space + * in the last_rcvd file if cl_idx is -1 (i.e. a new client). + * Otherwise, we have just read the data from the last_rcvd file and + * we know its offset. */ +int filter_client_new(const struct lu_env *env, struct filter_device *ofd, + struct filter_export_data *fed) +{ + struct obd_device *obd = filter_obd(ofd); + unsigned long *bitmap = ofd->ofd_last_rcvd_slots; + struct lsd_client_data *lcd = fed->fed_lcd; + struct filter_thread_info *info = filter_info(env); + int err, cl_idx = 0; + struct thandle *th; + ENTRY; + + LASSERT(bitmap != NULL); + + /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (!strcmp((char *)lcd->lcd_uuid, (char *)obd->obd_uuid.uuid)) + RETURN(0); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); +repeat: + if (cl_idx >= LR_MAX_CLIENTS) { + CERROR("no client slots - fix LR_MAX_CLIENTS\n"); + RETURN(-EOVERFLOW); + } + if (test_and_set_bit(cl_idx, bitmap)) { + cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, cl_idx); + goto repeat; + } + + fed->fed_lr_idx = cl_idx; + fed->fed_lr_off = ofd->ofd_fsd.lsd_client_start + + cl_idx * ofd->ofd_fsd.lsd_client_size; + init_mutex(&fed->fed_lastrcvd_lock); + LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); + + CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid); + + CDEBUG(D_INFO, "writing client lcd at idx %u (%llu) (len %u)\n", + fed->fed_lr_idx, fed->fed_lr_off, + (unsigned int)sizeof(*fed->fed_lcd)); + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + /* off is changed, use tmp value */ + info->fti_off = fed->fed_lr_off; + dt_declare_record_write(env, ofd->ofd_last_rcvd, info->fti_off, + sizeof(*lcd), th, BYPASS_CAPA); + err = filter_trans_start(env, ofd, th); + if (err) + RETURN(err); + /* XXX: until this operations will be committed the sync is needed for this + * export */ + /* + mdt_trans_add_cb(th, mdt_cb_new_client, mti->mti_exp); + spin_lock(&mti->mti_exp->exp_lock); + mti->mti_exp->exp_need_sync = 1; + spin_unlock(&mti->mti_exp->exp_lock); + */ + + err = filter_last_rcvd_write(env, ofd, lcd, &info->fti_off, th); + + CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n", + cl_idx, fed->fed_lr_off, sizeof(*fed->fed_lcd)); + + filter_trans_stop(env, ofd, th); + + RETURN(err); +} + +int filter_client_add(const struct lu_env *env, struct filter_device *ofd, + struct filter_export_data *fed, int cl_idx) +{ + struct obd_device *obd = filter_obd(ofd); + unsigned long *bitmap = ofd->ofd_last_rcvd_slots; + ENTRY; + + LASSERT(bitmap != NULL); + LASSERT(cl_idx >= 0); + + /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (!strcmp((char *)fed->fed_lcd->lcd_uuid, (char *)obd->obd_uuid.uuid)) + RETURN(0); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + if (test_and_set_bit(cl_idx, bitmap)) { + CERROR("FILTER client %d: bit already set in bitmap!\n", + cl_idx); + LBUG(); + } + + fed->fed_lr_idx = cl_idx; + fed->fed_lr_off = ofd->ofd_fsd.lsd_client_start + + cl_idx * ofd->ofd_fsd.lsd_client_size; + init_mutex(&fed->fed_lastrcvd_lock); + LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); + + CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid); + + RETURN(0); +} + +int filter_client_free(struct lu_env *env, struct obd_export *exp) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct obd_device *obd = exp->exp_obd; + struct filter_device *ofd = filter_exp(exp); + struct lsd_client_data *lcd = fed->fed_lcd; + struct thandle *th; + int rc; + ENTRY; + + if (fed->fed_lcd == NULL) + RETURN(0); + + /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (!strcmp((char *)fed->fed_lcd->lcd_uuid, (char *)obd->obd_uuid.uuid)) + GOTO(free, 0); + + CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n", + fed->fed_lr_idx, fed->fed_lr_off, fed->fed_lcd->lcd_uuid); + + LASSERT(ofd->ofd_last_rcvd_slots != NULL); + + /* Clear the bit _after_ zeroing out the client so we don't + race with filter_client_add and zero out new clients.*/ + if (!test_bit(fed->fed_lr_idx, ofd->ofd_last_rcvd_slots)) { + CERROR("FILTER client %u: bit already clear in bitmap!!\n", + fed->fed_lr_idx); + LBUG(); + } + + if (!(exp->exp_flags & OBD_OPT_FAILOVER)) { + struct filter_thread_info *info = filter_info(env); + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(free, rc = PTR_ERR(th)); + /* declare last_rcvd write */ + dt_declare_record_write(env, ofd->ofd_last_rcvd, fed->fed_lr_off, + sizeof(*lcd), th, BYPASS_CAPA); + /* declare header write */ + dt_declare_record_write(env, ofd->ofd_last_rcvd, 0, + sizeof(ofd->ofd_fsd), th, BYPASS_CAPA); + + rc = filter_trans_start(env, ofd, th); + if (rc) + GOTO(free, rc); + mutex_down(&fed->fed_lastrcvd_lock); + memset(lcd, 0, sizeof(*lcd)); + /* off is changed after write, use tmp value */ + info->fti_off = fed->fed_lr_off; + rc = filter_last_rcvd_write(env, ofd, lcd, + &info->fti_off, th); + mutex_up(&fed->fed_lastrcvd_lock); + LASSERT(rc == 0); + + /* update server's transno */ + filter_last_rcvd_header_write(env, ofd, th); + + filter_trans_stop(env, ofd, th); + + CDEBUG(rc == 0 ? D_INFO : D_ERROR, + "zeroing out client %s at idx %u (%llu) in %s rc %d\n", + fed->fed_lcd->lcd_uuid, fed->fed_lr_idx, fed->fed_lr_off, + LAST_RCVD, rc); + } + + if (!test_and_clear_bit(fed->fed_lr_idx, ofd->ofd_last_rcvd_slots)) { + CERROR("FILTER client %u: bit already clear in bitmap!!\n", + fed->fed_lr_idx); + LBUG(); + } + + EXIT; +free: + OBD_FREE(fed->fed_lcd, sizeof(*fed->fed_lcd)); + fed->fed_lcd = NULL; + + return 0; +} + diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c new file mode 100644 index 0000000..45f37b8 --- /dev/null +++ b/lustre/ofd/ofd_dev.c @@ -0,0 +1,879 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd.c + * + * Copyright (c) 2001-2007 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Alex Tomas + * Author: Mike Pershin + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include + +#include "ofd_internal.h" + +struct lu_object_operations filter_obj_ops; +struct lu_context_key filter_thread_key; + +struct filter_intent_args { + struct ldlm_lock **victim; + __u64 size; + int *liblustre; +}; + +static enum interval_iter filter_intent_cb(struct interval_node *n, + void *args) +{ + struct ldlm_interval *node = (struct ldlm_interval *)n; + struct filter_intent_args *arg = (struct filter_intent_args*)args; + __u64 size = arg->size; + struct ldlm_lock **v = arg->victim; + struct ldlm_lock *lck; + + /* If the interval is lower than the current file size, + * just break. */ + if (interval_high(n) <= size) + return INTERVAL_ITER_STOP; + + list_for_each_entry(lck, &node->li_group, l_sl_policy) { + /* Don't send glimpse ASTs to liblustre clients. + * They aren't listening for them, and they do + * entirely synchronous I/O anyways. */ + if (lck->l_export == NULL || + lck->l_export->exp_libclient == 1) + continue; + + if (*arg->liblustre) + *arg->liblustre = 0; + + if (*v == NULL) { + *v = LDLM_LOCK_GET(lck); + } else if ((*v)->l_policy_data.l_extent.start < + lck->l_policy_data.l_extent.start) { + LDLM_LOCK_RELEASE(*v); + *v = LDLM_LOCK_GET(lck); + } + + /* the same policy group - every lock has the + * same extent, so needn't do it any more */ + break; + } + + return INTERVAL_ITER_CONT; +} + +static int filter_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) +{ + CFS_LIST_HEAD(rpc_list); + struct ptlrpc_request *req = req_cookie; + struct ldlm_lock *lock = *lockp, *l = NULL; + struct ldlm_resource *res = lock->l_resource; + ldlm_processing_policy policy; + struct ost_lvb *res_lvb, *reply_lvb; + struct ldlm_reply *rep; + ldlm_error_t err; + int idx, rc, tmpflags = 0, only_liblustre = 1; + struct ldlm_interval_tree *tree; + struct filter_intent_args arg; + __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREPLY_OFF] = sizeof(*rep), + [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) }; + ENTRY; + + policy = ldlm_get_processing_policy(res); + LASSERT(policy != NULL); + LASSERT(req != NULL); + + rc = lustre_pack_reply(req, 3, repsize, NULL); + if (rc) + RETURN(req->rq_status = rc); + + rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); + LASSERT(rep != NULL); + + reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, + sizeof(*reply_lvb)); + LASSERT(reply_lvb != NULL); + + //fixup_handle_for_resent_req(req, lock, &lockh); + + /* Call the extent policy function to see if our request can be + * granted, or is blocked. + * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse + * lock, and should not be granted if the lock will be blocked. + */ + + LASSERT(ns == res->lr_namespace); + lock_res(res); + rc = policy(lock, &tmpflags, 0, &err, &rpc_list); + check_res_locked(res); + + /* FIXME: we should change the policy function slightly, to not make + * this list at all, since we just turn around and free it */ + while (!list_empty(&rpc_list)) { + struct ldlm_lock *wlock = + list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast); + LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + lock->l_flags &= ~LDLM_FL_CP_REQD; + list_del_init(&wlock->l_cp_ast); + LDLM_LOCK_RELEASE(wlock); + } + + /* The lock met with no resistance; we're finished. */ + if (rc == LDLM_ITER_CONTINUE) { + /* do not grant locks to the liblustre clients: they cannot + * handle ASTs robustly. We need to do this while still + * holding ns_lock to avoid the lock remaining on the res_link + * list (and potentially being added to l_pending_list by an + * AST) when we are going to drop this lock ASAP. */ + if (lock->l_export->exp_libclient || + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) { + ldlm_resource_unlink_lock(lock); + err = ELDLM_LOCK_ABORTED; + } else { + err = ELDLM_LOCK_REPLACED; + } + unlock_res(res); + RETURN(err); + } + + /* Do not grant any lock, but instead send GL callbacks. The extent + * policy nicely created a list of all PW locks for us. We will choose + * the highest of those which are larger than the size in the LVB, if + * any, and perform a glimpse callback. */ + res_lvb = res->lr_lvb_data; + LASSERT(res_lvb != NULL); + *reply_lvb = *res_lvb; + + /* + * ->ns_lock guarantees that no new locks are granted, and, + * therefore, that res->lr_lvb_data cannot increase beyond the + * end of already granted lock. As a result, it is safe to + * check against "stale" reply_lvb->lvb_size value without + * res->lr_lvb_sem. + */ + arg.size = reply_lvb->lvb_size; + arg.victim = &l; + arg.liblustre = &only_liblustre; + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + tree = &res->lr_itree[idx]; + if (tree->lit_mode == LCK_PR) + continue; + + interval_iterate_reverse(tree->lit_root, + filter_intent_cb, &arg); + } + unlock_res(res); + + /* There were no PW locks beyond the size in the LVB; finished. */ + if (l == NULL) { + if (only_liblustre) { + /* If we discovered a liblustre client with a PW lock, + * however, the LVB may be out of date! The LVB is + * updated only on glimpse (which we don't do for + * liblustre clients) and cancel (which the client + * obviously has not yet done). So if it has written + * data but kept the lock, the LVB is stale and needs + * to be updated from disk. + * + * Of course, this will all disappear when we switch to + * taking liblustre locks on the OST. */ + ldlm_res_lvbo_update(res, NULL, 0, 1); + } + RETURN(ELDLM_LOCK_ABORTED); + } + + /* + * This check is for lock taken in filter_prepare_destroy() that does + * not have l_glimpse_ast set. So the logic is: if there is a lock + * with no l_glimpse_ast set, this object is being destroyed already. + * + * Hence, if you are grabbing DLM locks on the server, always set + * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()). + */ + if (l->l_glimpse_ast == NULL) { + /* We are racing with unlink(); just return -ENOENT */ + rep->lock_policy_res1 = -ENOENT; + goto out; + } + + LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); + rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + /* Update the LVB from disk if the AST failed (this is a legal race) */ + /* + * XXX nikita: situation when ldlm_server_glimpse_ast() failed before + * sending ast is not handled. This can result in lost client writes. + */ + if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 0, 1); + + lock_res(res); + *reply_lvb = *res_lvb; + unlock_res(res); + + out: + LDLM_LOCK_RELEASE(l); + + RETURN(ELDLM_LOCK_ABORTED); +} + +/* used by MGS to process specific configurations */ +static int filter_process_config(const struct lu_env *env, + struct lu_device *d, struct lustre_cfg *cfg) +{ + struct filter_device *m = filter_dev(d); + struct dt_device *dt_next = m->ofd_osd; + struct lu_device *next = &dt_next->dd_lu_dev; + int rc = 0; + ENTRY; + + switch (cfg->lcfg_command) { + case LCFG_PARAM: { + struct lprocfs_static_vars lvars; + + lprocfs_filter_init_vars(&lvars); + rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, cfg, + d->ld_obd); + if (rc) + /* others are passed further */ + rc = next->ld_ops->ldo_process_config(env, next, cfg); + break; + } + case LCFG_SPTLRPC_CONF: { + //struct sptlrpc_conf_log *log; + //struct sptlrpc_rule_set tmp_rset; + LBUG(); +#if 0 + log = sptlrpc_conf_log_extract(cfg); + if (IS_ERR(log)) { + rc = PTR_ERR(log); + break; + } + + sptlrpc_rule_set_init(&tmp_rset); + + rc = sptlrpc_rule_set_from_log(&tmp_rset, log); + if (rc) { + CERROR("obd %s: failed get sptlrpc rules: %d\n", + d->ld_obd->obd_name, rc); + break; + } + + write_lock(&m->ofd_sptlrpc_lock); + sptlrpc_rule_set_free(&m->ofd_sptlrpc_rset); + m->ofd_sptlrpc_rset = tmp_rset; + write_unlock(&m->ofd_sptlrpc_lock); + + sptlrpc_target_update_exp_flavor(d->ld_obd, &tmp_rset); +#endif + break; + } + default: + /* others are passed further */ + rc = next->ld_ops->ldo_process_config(env, next, cfg); + break; + } + RETURN(rc); +} + +static struct lu_object *filter_object_alloc(const struct lu_env *env, + const struct lu_object_header *hdr, + struct lu_device *d) +{ + struct filter_object *of; + + ENTRY; + + OBD_ALLOC_PTR(of); + if (of != NULL) { + struct lu_object *o; + struct lu_object_header *h; + + o = &of->ofo_obj.do_lu; + h = &of->ofo_header; + lu_object_header_init(h); + lu_object_init(o, h, d); + lu_object_add_top(h, o); + o->lo_ops = &filter_obj_ops; + RETURN(o); + } else + RETURN(NULL); +} + +static int filter_object_init(const struct lu_env *env, struct lu_object *o, + const struct lu_object_conf *conf) +{ + struct filter_device *d = filter_dev(o->lo_dev); + struct lu_device *under; + struct lu_object *below; + int rc = 0; + ENTRY; + + CDEBUG(D_INFO, "object init, fid = "DFID"\n", + PFID(lu_object_fid(o))); + + under = &d->ofd_osd->dd_lu_dev; + below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); + if (below != NULL) { + lu_object_add(o, below); + } else + rc = -ENOMEM; + + RETURN(rc); +} + +static void filter_object_free(const struct lu_env *env, struct lu_object *o) +{ + struct filter_object *of = filter_obj(o); + struct lu_object_header *h; + ENTRY; + + h = o->lo_header; + CDEBUG(D_INFO, "object free, fid = "DFID"\n", + PFID(lu_object_fid(o))); + + lu_object_fini(o); + lu_object_header_fini(h); + OBD_FREE_PTR(of); + EXIT; +} + +static int filter_object_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) +{ + return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o); +} + +static struct lu_device_operations filter_lu_ops = { + .ldo_object_alloc = filter_object_alloc, + .ldo_process_config = filter_process_config +}; + +struct lu_object_operations filter_obj_ops = { + .loo_object_init = filter_object_init, + .loo_object_free = filter_object_free, + .loo_object_print = filter_object_print +}; + +static struct lu_device *filter_layer_setup(const struct lu_env *env, + const char *typename, + struct lu_device *child, + struct lustre_cfg *cfg) +{ + const char *dev = lustre_cfg_string(cfg, 0); + struct obd_type *type; + struct lu_device_type *ldt; + struct lu_device *d; + int rc; + ENTRY; + + /* find the type */ + type = class_get_type(typename); + if (!type) { + CERROR("Unknown type: '%s'\n", typename); + GOTO(out, rc = -ENODEV); + } + + rc = lu_env_refill((struct lu_env *) &env->le_ctx); + if (rc != 0) { + CERROR("Failure to refill context: '%d'\n", rc); + GOTO(out_type, rc); + } + + if (env->le_ses != NULL) { + rc = lu_context_refill(env->le_ses); + if (rc != 0) { + CERROR("Failure to refill session: '%d'\n", rc); + GOTO(out_type, rc); + } + } + + ldt = type->typ_lu; + if (ldt == NULL) { + CERROR("type: '%s'\n", typename); + GOTO(out_type, rc = -EINVAL); + } + + ldt->ldt_obd_type = type; + d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg); + if (IS_ERR(d)) { + CERROR("Cannot allocate device: '%s'\n", typename); + GOTO(out_type, rc = -ENODEV); + } + + LASSERT(child->ld_site); + d->ld_site = child->ld_site; + + type->typ_refcnt++; + rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child); + if (rc) { + CERROR("can't init device '%s', rc %d\n", typename, rc); + GOTO(out_alloc, rc); + } + lu_device_get(d); + + RETURN(d); + +out_alloc: + ldt->ldt_ops->ldto_device_free(env, d); + type->typ_refcnt--; +out_type: + class_put_type(type); +out: + return ERR_PTR(rc); +} + +static int filter_stack_init(const struct lu_env *env, + struct filter_device *m, struct lustre_cfg *cfg) +{ + struct lu_device *d = &m->ofd_dt_dev.dd_lu_dev; + struct lu_device *tmp; + int rc; + ENTRY; + + /* init the stack */ + tmp = filter_layer_setup(env, LUSTRE_OSD_NAME, d, cfg); + if (IS_ERR(tmp)) { + RETURN(PTR_ERR(tmp)); + } + m->ofd_osd = lu2dt_dev(tmp); + + /* process setup config */ + rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg); + if (rc) + GOTO(out, rc); + + rc = tmp->ld_ops->ldo_prepare(env, d, tmp); + GOTO(out, rc); + +out: + /* XXX: error handling */ + LASSERT(rc == 0); + + return rc; +} + +static void filter_stack_fini(const struct lu_env *env, + struct filter_device *m, struct lu_device *top) +{ + LBUG(); +} + +#if 0 +static struct lvfs_callback_ops null_ops = { + .l_fid2dentry = NULL +}; +#endif + +extern int ost_handle(struct ptlrpc_request *req); + +static int filter_procfs_init(struct filter_device *ofd) +{ + struct lprocfs_static_vars lvars; + struct obd_device *obd = filter_obd(ofd); + int rc = 0; + + /* lprocfs must be setup before the filter so state can be safely added + * to /proc incrementally as the filter is setup */ + lprocfs_filter_init_vars(&lvars); + if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && + lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST) == 0) { + /* Init obdfilter private stats here */ + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES, + LPROCFS_CNTR_AVGMINMAX, + "read_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + LPROCFS_CNTR_AVGMINMAX, + "write_bytes", "bytes"); + + lproc_filter_attach_seqstat(obd); + obd->obd_proc_exports_entry = lprocfs_register("exports", + obd->obd_proc_entry, + NULL, NULL); + if (IS_ERR(obd->obd_proc_exports_entry)) { + rc = PTR_ERR(obd->obd_proc_exports_entry); + CERROR("error %d setting up lprocfs for %s\n", + rc, "exports"); + obd->obd_proc_exports_entry = NULL; + } + } + if (obd->obd_proc_exports_entry) + lprocfs_add_simple(obd->obd_proc_exports_entry, "clear", + lprocfs_nid_stats_clear_read, + lprocfs_nid_stats_clear_write, obd, NULL); + return rc; +} + +static int filter_procfs_fini(struct filter_device *ofd) +{ + struct obd_device *obd = filter_obd(ofd); + + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); + lprocfs_free_per_client_stats(obd); + lprocfs_free_obd_stats(obd); + lprocfs_obd_cleanup(obd); + return 0; +} + +static int filter_init0(const struct lu_env *env, struct filter_device *m, + struct lu_device_type *ldt, struct lustre_cfg *cfg) +{ + const char *dev = lustre_cfg_string(cfg, 0); + struct filter_thread_info *info = filter_info_init(env, NULL); + struct filter_obd *filter; + struct lustre_mount_info *lmi; + struct obd_device *obd; + struct lu_site *s; + int rc; + ENTRY; + + rc = lu_env_refill((struct lu_env *)env); + if (rc != 0) + RETURN(rc); + + LASSERT(info != NULL); + + obd = class_name2obd(dev); + LASSERT(obd != NULL); + + lmi = server_get_mount_2(dev); + obd->obd_fsops = fsfilt_get_ops(MT_STR(s2lsi(lmi->lmi_sb)->lsi_ldd)); + LASSERT(obd->obd_fsops != NULL); + + spin_lock_init(&m->ofd_transno_lock); + spin_lock_init(&m->ofd_client_bitmap_lock); + + m->ofd_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT; + m->ofd_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; + + /* grant data */ + spin_lock_init(&m->ofd_grant_lock); + m->ofd_tot_dirty = 0; + m->ofd_tot_granted = 0; + m->ofd_tot_pending = 0; + +#if 0 + rwlock_init(&m->ofd_sptlrpc_lock); + sptlrpc_rule_set_init(&m->ofd_sptlrpc_rset); +#else + filter = &obd->u.filter; + rwlock_init(&filter->fo_sptlrpc_lock); + sptlrpc_rule_set_init(&filter->fo_sptlrpc_rset); +#endif + + m->ofd_fl_oss_capa = 0; + CFS_INIT_LIST_HEAD(&m->ofd_capa_keys); + m->ofd_capa_hash = init_capa_hash(); + if (m->ofd_capa_hash == NULL) + RETURN(-ENOMEM); + + CFS_INIT_LIST_HEAD(&m->ofd_llog_list); + spin_lock_init(&m->ofd_llog_list_lock); + m->ofd_lcm = NULL; + + OBD_ALLOC_PTR(s); + if (s == NULL) + RETURN(-ENOMEM); + + dt_device_init(&m->ofd_dt_dev, ldt); + m->ofd_dt_dev.dd_lu_dev.ld_ops = &filter_lu_ops; + m->ofd_dt_dev.dd_lu_dev.ld_obd = obd; + /* set this lu_device to obd, because error handling need it */ + obd->obd_lu_dev = &m->ofd_dt_dev.dd_lu_dev; + + rc = lu_site_init(s, &m->ofd_dt_dev.dd_lu_dev); + if (rc) { + CERROR("Can't init lu_site, rc %d\n", rc); + GOTO(err_free_site, rc); + } + + rc = filter_procfs_init(m); + if (rc) { + CERROR("Can't init filter lprocfs, rc %d\n", rc); + GOTO(err_fini_proc, rc); + } + + obd->obd_replayable = 1; + /* No connection accepted until configurations will finish */ + obd->obd_no_conn = 1; + + if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) { + char *str = lustre_cfg_string(cfg, 4); + if (strchr(str, 'n')) { + CWARN("%s: recovery disabled\n", obd->obd_name); + obd->obd_replayable = 0; + } + } + + /* init the stack */ + rc = filter_stack_init(env, m, cfg); + if (rc) { + CERROR("Can't init device stack, rc %d\n", rc); + GOTO(err_fini_proc, rc); + } + + snprintf(info->fti_u.ns_name, sizeof info->fti_u.ns_name, + LUSTRE_OST_NAME"-%p", m); + m->ofd_namespace = ldlm_namespace_new(obd, info->fti_u.ns_name, + LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY); + if (m->ofd_namespace == NULL) + GOTO(err_stack_fini, rc = -ENOMEM); + + dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf); + + ldlm_register_intent(m->ofd_namespace, filter_intent_policy); + m->ofd_namespace->ns_lvbo = &filter_lvbo; + m->ofd_namespace->ns_lvbp = m; + /* set obd_namespace for compatibility with old code */ + obd->obd_namespace = m->ofd_namespace; + + ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, + "filter_ldlm_cb_client", &obd->obd_ldlm_client); + + rc = filter_fs_setup(env, m, obd); + if (rc) + GOTO(err_free_ns, rc); + +#if 0 + lvfs_init_ctxt(&obd->obd_lvfs_ctxt, lmi->lmi_mnt, &null_ops); + + LASSERT(obd->obd_olg.olg_group == OBD_LLOG_GROUP); + rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL); + LASSERT(rc == 0); +#endif + + target_recovery_init(obd, ost_handle); + + rc = lu_site_init_finish(s); + if (rc) + GOTO(err_fs_cleanup, rc); + + //if (obd->obd_recovering == 0) + // filter_postrecov(env, m); + + if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) + ldlm_timeout = 6; + + RETURN(0); + +err_fs_cleanup: + target_recovery_fini(obd); + filter_fs_cleanup(env, m); +err_free_ns: + ldlm_namespace_free(m->ofd_namespace, 0, obd->obd_force); + obd->obd_namespace = m->ofd_namespace = NULL; +err_stack_fini: + filter_stack_fini(env, m, &m->ofd_osd->dd_lu_dev); +err_fini_proc: + filter_procfs_fini(m); + lu_site_fini(s); +err_free_site: + OBD_FREE_PTR(s); + + dt_device_fini(&m->ofd_dt_dev); + return (rc); +} + +static void filter_fini(const struct lu_env *env, struct filter_device *m) +{ + struct filter_obd *filter = &obd->u.filter; + ENTRY; + + if (obd->obd_fail) + LCONSOLE_WARN("%s: shutting down for failover; client state " + "will be preserved.\n", obd->obd_name); + + if (!list_empty(&obd->obd_exports)) { + CERROR("%s: still has clients!\n", obd->obd_name); + class_disconnect_exports(obd); + if (!list_empty(&obd->obd_exports)) { + CERROR("still has exports after forced cleanup?\n"); + RETURN(-EBUSY); + } + } + + filter_procfs_fini(); +#if 0 + lquota_cleanup(filter_quota_interface_ref, obd); +#endif + + /* Stop recovery before namespace cleanup. */ + target_recovery_fini(obd); + target_cleanup_recovery(obd); + + ldlm_namespace_free(obd->obd_namespace, obd->obd_force); + + sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset); + + filter_post(obd); + +#if 0 + LL_DQUOT_OFF(obd->u.obt.obt_sb); + shrink_dcache_parent(obd->u.obt.obt_sb->s_root); +#endif + + server_put_mount(obd->obd_name, filter->fo_vfsmnt); + + LCONSOLE_INFO("OST %s has stopped.\n", obd->obd_name); + + RETURN(0); +} + +static struct lu_device* filter_device_fini(const struct lu_env *env, + struct lu_device *d) +{ + ENTRY; + filter_fini(env, filter_dev(d)); + RETURN(NULL); +} + +static struct lu_device *filter_device_free(const struct lu_env *env, + struct lu_device *d) +{ + struct filter_device *m = filter_dev(d); + + LBUG(); + OBD_FREE_PTR(m); + /* XXX: see mdt_device_free() */ + LBUG(); + RETURN(NULL); +} + +static struct lu_device *filter_device_alloc(const struct lu_env *env, + struct lu_device_type *t, + struct lustre_cfg *cfg) +{ + struct filter_device *m; + struct lu_device *l; + int rc; + + OBD_ALLOC_PTR(m); + if (m == NULL) + return ERR_PTR(-ENOMEM); + + l = &m->ofd_dt_dev.dd_lu_dev; + rc = filter_init0(env, m, t, cfg); + if (rc != 0) { + OBD_FREE_PTR(m); + l = ERR_PTR(rc); + } + + return l; +} + +/* thread context key constructor/destructor */ +LU_KEY_INIT_FINI(filter, struct filter_thread_info); +//LU_CONTEXT_KEY_DEFINE(filter, LCT_DT_THREAD); +static void filter_key_exit(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ + struct filter_thread_info *info = data; + memset(info, 0, sizeof(*info)); +} + +struct lu_context_key filter_thread_key = { + .lct_tags = LCT_DT_THREAD, + .lct_init = filter_key_init, + .lct_fini = filter_key_fini, + .lct_exit = filter_key_exit +}; + +/* transaction context key */ +LU_KEY_INIT_FINI(filter_txn, struct filter_txn_info); +LU_CONTEXT_KEY_DEFINE(filter_txn, LCT_TX_HANDLE); + +/* type constructor/destructor: mdt_type_init, mdt_type_fini */ +LU_TYPE_INIT_FINI(filter, &filter_thread_key, &filter_txn_thread_key); + +static struct lu_device_type_operations filter_device_type_ops = { + .ldto_init = filter_type_init, + .ldto_fini = filter_type_fini, + + .ldto_start = filter_type_start, + .ldto_stop = filter_type_stop, + + .ldto_device_alloc = filter_device_alloc, + .ldto_device_free = filter_device_free, + .ldto_device_fini = filter_device_fini +}; + +static struct lu_device_type filter_device_type = { + .ldt_tags = LU_DEVICE_DT, + .ldt_name = LUSTRE_OST_NAME, + .ldt_ops = &filter_device_type_ops, + .ldt_ctx_tags = LCT_DT_THREAD +}; + +quota_interface_t *filter_quota_interface_ref; +extern quota_interface_t filter_quota_interface; +extern struct obd_ops filter_obd_ops; + +int __init ofd_init(void) +{ + struct lprocfs_static_vars lvars; + int rc; + + lprocfs_filter_init_vars(&lvars); + + request_module("lquota"); + + rc = ofd_fmd_init(); + if (rc) + GOTO(out, rc); + + //filter_quota_interface_ref = PORTAL_SYMBOL_GET(filter_quota_interface); + init_obd_quota_ops(filter_quota_interface_ref, &filter_obd_ops); + + rc = class_register_type(&filter_obd_ops, NULL, lvars.module_vars, + LUSTRE_OST_NAME, &filter_device_type); + if (rc) { + ofd_fmd_exit(); +out: + if (filter_quota_interface_ref) + PORTAL_SYMBOL_PUT(filter_quota_interface); + } + + return rc; +} + +void __exit ofd_exit(void) +{ + if (filter_quota_interface_ref) + PORTAL_SYMBOL_PUT(filter_quota_interface); + + ofd_fmd_exit(); + + class_unregister_type(LUSTRE_OST_NAME); +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre Filtering driver"); +MODULE_LICENSE("GPL"); + +module_init(ofd_init); +module_exit(ofd_exit); diff --git a/lustre/ofd/ofd_fmd.c b/lustre/ofd/ofd_fmd.c new file mode 100644 index 0000000..525b525 --- /dev/null +++ b/lustre/ofd/ofd_fmd.c @@ -0,0 +1,231 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/obdfilter/filter_fmd.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include + +#include "ofd_internal.h" + +cfs_mem_cache_t *ll_fmd_cachep; + +/* drop fmd reference, free it if last ref. must be called with fed_lock held.*/ +static inline void filter_fmd_put_nolock(struct obd_export *exp, + struct filter_mod_data *fmd) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + + LASSERT_SPIN_LOCKED(&fed->fed_lock); + if (--fmd->fmd_refcount == 0) { + /* XXX when we have persistent reservations and the handle + * is stored herein we need to drop it here. */ + fed->fed_mod_count--; + list_del(&fmd->fmd_list); + OBD_SLAB_FREE(fmd, ll_fmd_cachep, sizeof(*fmd)); + } +} + +/* drop fmd reference, free it if last ref */ +void filter_fmd_put(struct obd_export *exp, struct filter_mod_data *fmd) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + + if (fmd == NULL) + return; + + spin_lock(&fed->fed_lock); + filter_fmd_put_nolock(exp, fmd); /* caller reference */ + spin_unlock(&fed->fed_lock); +} + +/* expire entries from the end of the list if there are too many + * or they are too old */ +static void filter_fmd_expire_nolock(struct obd_export *exp, + struct filter_mod_data *keep) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_device *ofd = filter_exp(exp); + + struct filter_mod_data *fmd, *tmp; + cfs_time_t now = cfs_time_current(); + + list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { + if (fmd == keep) + break; + + if (cfs_time_before(now, fmd->fmd_expire) && + fed->fed_mod_count < ofd->ofd_fmd_max_num) + break; + + list_del_init(&fmd->fmd_list); + filter_fmd_put_nolock(exp, fmd); /* list reference */ + } +} + +void filter_fmd_expire(struct obd_export *exp) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + + spin_lock(&fed->fed_lock); + filter_fmd_expire_nolock(exp, NULL); + spin_unlock(&fed->fed_lock); +} + +/* find specified fid in fed_fmd_list. + * caller must hold fed_lock and take fmd reference itself */ +static struct filter_mod_data *filter_fmd_find_nolock(struct obd_export *exp, + const struct lu_fid *fid) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_mod_data *found = NULL, *fmd; + struct filter_device *ofd = filter_exp(exp); + cfs_time_t now = cfs_time_current(); + + LASSERT_SPIN_LOCKED(&fed->fed_lock); + + list_for_each_entry_reverse(fmd, &fed->fed_mod_list, fmd_list) { + if (lu_fid_eq(&fmd->fmd_fid, fid)) { + found = fmd; + list_del(&fmd->fmd_list); + list_add_tail(&fmd->fmd_list, &fed->fed_mod_list); + fmd->fmd_expire = cfs_time_add(now, ofd->ofd_fmd_max_age); + break; + } + } + + filter_fmd_expire_nolock(exp, found); + + return found; +} + +/* Find fmd based on fid or return NULL if not found. */ +struct filter_mod_data *filter_fmd_find(struct obd_export *exp, + struct lu_fid *fid) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_mod_data *fmd; + + spin_lock(&fed->fed_lock); + fmd = filter_fmd_find_nolock(exp, fid); + if (fmd) + fmd->fmd_refcount++; /* caller reference */ + spin_unlock(&fed->fed_lock); + + return fmd; +} + +/* Find fmd based on FID, or create a new one if none is found. + * It is possible for this function to return NULL under memory pressure, + * or if fid = 0 is passed (which will only cause old entries to expire). + * Currently this is not fatal because any fmd state is transient and + * may also be freed when it gets sufficiently old. */ +struct filter_mod_data *filter_fmd_get(struct obd_export *exp, + struct lu_fid *fid) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_device *ofd = filter_exp(exp); + struct filter_mod_data *found = NULL, *fmd_new = NULL; + cfs_time_t now = cfs_time_current(); + + OBD_SLAB_ALLOC(fmd_new, ll_fmd_cachep, CFS_ALLOC_IO, sizeof(*fmd_new)); + + spin_lock(&fed->fed_lock); + found = filter_fmd_find_nolock(exp, fid); + if (fmd_new) { + if (found == NULL) { + list_add_tail(&fmd_new->fmd_list, &fed->fed_mod_list); + fmd_new->fmd_fid = *fid; + fmd_new->fmd_refcount++; /* list reference */ + found = fmd_new; + fed->fed_mod_count++; + } else { + OBD_SLAB_FREE(fmd_new, ll_fmd_cachep, sizeof(*fmd_new)); + } + } + if (found) { + found->fmd_refcount++; /* caller reference */ + found->fmd_expire = cfs_time_add(now, ofd->ofd_fmd_max_age); + } + + spin_unlock(&fed->fed_lock); + + return found; +} + +#ifdef DO_FMD_DROP +/* drop fmd list reference so it will disappear when last reference is put. + * This isn't so critical because it would in fact only affect the one client + * that is doing the unlink and at worst we have an stale entry referencing + * an object that should never be used again. */ +void filter_fmd_drop(struct obd_export *exp, struct lu_fid *fid) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_mod_data *found = NULL; + + spin_lock(&fed->fed_lock); + found = filter_fmd_find_nolock(exp, fid); + if (found) { + list_del_init(&found->fmd_list); + filter_fmd_put_nolock(exp, found); + } + spin_unlock(&fed->fed_lock); +} +#endif + +/* remove all entries from fmd list */ +void filter_fmd_cleanup(struct obd_export *exp) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + struct filter_mod_data *fmd = NULL, *tmp; + + spin_lock(&fed->fed_lock); + list_for_each_entry_safe(fmd, tmp, &fed->fed_mod_list, fmd_list) { + list_del_init(&fmd->fmd_list); + if (fmd->fmd_refcount > 1) { + CDEBUG(D_INFO, "fmd %p still referenced (refcount = %d)\n", + fmd, fmd->fmd_refcount); + } + filter_fmd_put_nolock(exp, fmd); + } + spin_unlock(&fed->fed_lock); +} + +int ofd_fmd_init(void) +{ + ll_fmd_cachep = cfs_mem_cache_create("ll_fmd_cache", + sizeof(struct filter_mod_data), + 0, 0); + if (!ll_fmd_cachep) + return -ENOMEM; + else + return 0; +} + +void ofd_fmd_exit(void) +{ + if (ll_fmd_cachep) { + int rc = cfs_mem_cache_destroy(ll_fmd_cachep); + LASSERTF(rc == 0, "Cannot destroy ll_fmd_cachep: rc %d\n", rc); + ll_fmd_cachep = NULL; + } +} diff --git a/lustre/ofd/ofd_fs.c b/lustre/ofd/ofd_fs.c new file mode 100644 index 0000000..409499a --- /dev/null +++ b/lustre/ofd/ofd_fs.c @@ -0,0 +1,557 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/filter_fs.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Alex Tomas + * Author: Mike Pershin + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +obd_id filter_last_id(struct filter_device *ofd, obd_gr group) +{ + obd_id id; + + LASSERT(group <= ofd->ofd_max_group); + + spin_lock(&ofd->ofd_objid_lock); + id = ofd->ofd_last_objids[group]; + spin_unlock(&ofd->ofd_objid_lock); + + return id; +} + +void filter_last_id_set(struct filter_device *ofd, obd_id id, obd_gr group) +{ + LASSERT(group <= ofd->ofd_max_group); + spin_lock(&ofd->ofd_objid_lock); + if (ofd->ofd_last_objids[group] < id) + ofd->ofd_last_objids[group] = id; + else + CWARN("Group ID "LPU64" is greater then new one "LPU64"!\n", + ofd->ofd_last_objids[group], id); + spin_unlock(&ofd->ofd_objid_lock); +} + +int filter_last_id_read(const struct lu_env *env, struct filter_device *ofd, + obd_gr group) +{ + struct filter_thread_info *info = filter_info(env); + obd_id tmp; + int rc; + ENTRY; + + LASSERT(ofd->ofd_groups_file != NULL); + LASSERT(info); + + info->fti_buf.lb_buf = &tmp; + info->fti_buf.lb_len = sizeof(tmp); + info->fti_off = group * sizeof(tmp); + + rc = dt_record_read(env, ofd->ofd_groups_file, &info->fti_buf, + &info->fti_off, NULL); + if (rc >= 0) { + filter_last_id_set(ofd, le64_to_cpu(tmp), group); + CDEBUG(D_INODE, "%s: read last_objid for group "LPU64": " + LPU64"\n", filter_obd(ofd)->obd_name, group, + filter_last_id(ofd, group)); + } else + CERROR("read group "LPU64" last objid: rc = %d\n", group, rc); + + RETURN(rc); +} + +int filter_last_id_write(const struct lu_env *env, struct filter_device *ofd, + obd_gr group, int force_sync) +{ + struct filter_thread_info *info = filter_info(env); + struct thandle *th; + obd_id tmp; + int rc; + ENTRY; + + CDEBUG(D_INODE, "%s: write last_objid for group "LPU64": "LPU64"\n", + filter_obd(ofd)->obd_name, group, filter_last_id(ofd, group)); + + LASSERT(ofd->ofd_groups_file != NULL); + + tmp = cpu_to_le64(filter_last_id(ofd, group)); + info->fti_buf.lb_buf = &tmp; + info->fti_buf.lb_len = sizeof(tmp); + info->fti_off = group * sizeof(tmp); + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + rc = dt_declare_record_write(env, ofd->ofd_groups_file, info->fti_off, + info->fti_buf.lb_len, th, BYPASS_CAPA); + LASSERT(rc == 0); + rc = filter_trans_start(env, ofd, th); + if (rc) + RETURN(rc); + + rc = dt_record_write(env, ofd->ofd_groups_file, &info->fti_buf, + &info->fti_off, th, 1); + if (rc) + CERROR("write group "LPU64" last objid: rc = %d\n", group, rc); + + filter_trans_stop(env, ofd, th); + + RETURN(rc); +} + +/* filter groups managements */ +int filter_groups_init(const struct lu_env *env, struct filter_device *ofd) +{ + struct filter_thread_info *info = filter_info(env); + unsigned long groups_size; + obd_id lastid; + int rc, i; + + spin_lock_init(&ofd->ofd_objid_lock); + + rc = dt_attr_get(env, ofd->ofd_groups_file, &info->fti_attr, + BYPASS_CAPA); + if (rc) + RETURN(rc); + + groups_size = (unsigned long)info->fti_attr.la_size; + + if (groups_size == 0) { + CWARN("%s: no groups yet\n", filter_obd(ofd)->obd_name); + RETURN(0); + } + + if (groups_size % sizeof(lastid) != 0) { + CERROR("groups file is corrupted? size = %lu\n", groups_size); + RETURN(-EIO); + } + + ofd->ofd_max_group = groups_size / sizeof(lastid); + LASSERT(ofd->ofd_max_group <= FILTER_MAX_GROUPS); /* XXX: dynamic? */ + + info->fti_off = 0; + info->fti_buf.lb_buf = &ofd->ofd_last_objids; + info->fti_buf.lb_len = sizeof(lastid) * ofd->ofd_max_group; + rc = dt_record_read(env, ofd->ofd_groups_file, &info->fti_buf, + &info->fti_off, NULL); + if (rc) { + CERROR("can't initialize last_ids: %d\n", rc); + RETURN(rc); + } + /* last objids are stored in le format, convert them to cpu */ + spin_lock(&ofd->ofd_objid_lock); + for (i = 0; i < groups_size; i++) + ofd->ofd_last_objids[i] = le64_to_cpu(ofd->ofd_last_objids[i]); + spin_unlock(&ofd->ofd_objid_lock); + + CWARN("%s: %u groups initialized\n", filter_obd(ofd)->obd_name, + ofd->ofd_max_group); + RETURN(0); +} + +static inline void fsd_le_to_cpu(struct lr_server_data *buf, + struct lr_server_data *lsd) +{ + memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid)); + lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno); + lsd->lsd_compat14 = le64_to_cpu(buf->lsd_compat14); + lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count); + lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat); + lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat); + lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat); + lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size); + lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start); + lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size); + lsd->lsd_subdir_count = le16_to_cpu(buf->lsd_subdir_count); + lsd->lsd_catalog_oid = le64_to_cpu(buf->lsd_catalog_oid); + lsd->lsd_catalog_ogen = le32_to_cpu(buf->lsd_catalog_ogen); + memcpy(lsd->lsd_peeruuid, buf->lsd_peeruuid, + sizeof (lsd->lsd_peeruuid)); + lsd->lsd_ost_index = le32_to_cpu(buf->lsd_ost_index); + lsd->lsd_mdt_index = le32_to_cpu(buf->lsd_mdt_index); +} + +static inline void fsd_cpu_to_le(struct lr_server_data *lsd, + struct lr_server_data *buf) +{ + memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (lsd->lsd_uuid)); + buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno); + buf->lsd_compat14 = cpu_to_le64(lsd->lsd_compat14); + buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count); + buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat); + buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat); + buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat); + buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size); + buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start); + buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size); + buf->lsd_subdir_count = cpu_to_le16(lsd->lsd_subdir_count); + buf->lsd_catalog_oid = cpu_to_le64(lsd->lsd_catalog_oid); + buf->lsd_catalog_ogen = cpu_to_le32(lsd->lsd_catalog_ogen); + memcpy(buf->lsd_peeruuid, lsd->lsd_peeruuid, + sizeof (lsd->lsd_peeruuid)); + buf->lsd_ost_index = cpu_to_le32(lsd->lsd_ost_index); + buf->lsd_mdt_index = cpu_to_le32(lsd->lsd_mdt_index); +} + +static inline void lcd_le_to_cpu(struct lsd_client_data *buf, + struct lsd_client_data *lcd) +{ + memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid)); + lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno); + lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid); +#if 0 + lcd->lcd_group = le32_to_cpu(buf->lcd_group); +#endif +} + +static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, + struct lsd_client_data *buf) +{ + memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid)); + buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno); + buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid); +#if 0 + buf->lcd_group = cpu_to_le32(lcd->lcd_group); +#endif +} + +static int filter_last_rcvd_header_read(const struct lu_env *env, + struct filter_device *ofd) +{ + struct filter_thread_info *info = filter_info(env); + int rc; + + info->fti_off = 0; + info->fti_buf.lb_buf = &info->fti_fsd; + info->fti_buf.lb_len = sizeof(info->fti_fsd); + + rc = dt_record_read(env, ofd->ofd_last_rcvd, &info->fti_buf, + &info->fti_off, BYPASS_CAPA); + if (rc == 0) + fsd_le_to_cpu(&info->fti_fsd, &ofd->ofd_fsd); + return rc; +} + +int filter_last_rcvd_header_write(const struct lu_env *env, + struct filter_device *ofd, + struct thandle *th) +{ + struct filter_thread_info *info = filter_info(env); + int rc; + ENTRY; + + info->fti_buf.lb_buf = &info->fti_fsd; + info->fti_buf.lb_len = sizeof(info->fti_fsd); + info->fti_off = 0; + + fsd_cpu_to_le(&ofd->ofd_fsd, &info->fti_fsd); + + rc = dt_record_write(env, ofd->ofd_last_rcvd, &info->fti_buf, + &info->fti_off, th, 1); + CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, ofd->ofd_fsd.lsd_uuid, ofd->ofd_fsd.lsd_last_transno); + + RETURN(rc); +} + +static int filter_last_rcvd_read(const struct lu_env *env, + struct filter_device *ofd, + struct lsd_client_data *lcd, loff_t *off) +{ + struct filter_thread_info *info = filter_info(env); + int rc; + + info->fti_buf.lb_buf = &info->fti_fsd; + info->fti_buf.lb_len = sizeof(info->fti_fsd); + + rc = dt_record_read(env, ofd->ofd_last_rcvd, &info->fti_buf, + off, BYPASS_CAPA); + if (rc == 0) + lcd_le_to_cpu((struct lsd_client_data *) &info->fti_fsd, lcd); + return rc; +} + +int filter_last_rcvd_write(const struct lu_env *env, + struct filter_device *ofd, + struct lsd_client_data *lcd, + loff_t *off, struct thandle *th) +{ + struct filter_thread_info *info = filter_info(env); + int rc; + + lcd_cpu_to_le(lcd, (struct lsd_client_data *) &info->fti_fsd); + + info->fti_buf.lb_buf = &info->fti_fsd; + info->fti_buf.lb_len = sizeof(info->fti_fsd); + + rc = dt_record_write(env, ofd->ofd_last_rcvd, &info->fti_buf, off, th, 1); + return rc; +} + +static inline int filter_clients_data_init(const struct lu_env *env, + struct filter_device *ofd, + unsigned long fsize) +{ + struct obd_device *obd = filter_obd(ofd); + struct lr_server_data *fsd = &ofd->ofd_fsd; + struct lsd_client_data *lcd = NULL; + struct filter_export_data *fed; + int cl_idx, rc = 0; + loff_t off = fsd->lsd_client_start; + + CLASSERT (offsetof(struct lsd_client_data, lcd_padding) + + sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE); + + for (cl_idx = 0; off < fsize; cl_idx++) { + struct obd_export *exp; + __u64 last_rcvd; + + if (!lcd) { + OBD_ALLOC_PTR(lcd); + if (!lcd) + GOTO(err_out, rc = -ENOMEM); + } + + /* Don't assume off is incremented properly by + * fsfilt_read_record(), in case sizeof(*lcd) + * isn't the same as fsd->lsd_client_size. */ + off = fsd->lsd_client_start + cl_idx * fsd->lsd_client_size; + rc = filter_last_rcvd_read(env, ofd, lcd, &off); + if (rc) { + CERROR("error reading FILT %s idx %d off %llu: rc %d\n", + LAST_RCVD, cl_idx, off, rc); + rc = 0; + break; /* read error shouldn't cause startup to fail */ + } + + if (lcd->lcd_uuid[0] == '\0') { + CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", + cl_idx); + continue; + } + + last_rcvd = lcd->lcd_last_transno; + + /* These exports are cleaned up by filter_disconnect(), so they + * need to be set up like real exports as filter_connect() does. + */ + exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid); + + CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx, + last_rcvd, fsd->lsd_last_transno); + + if (IS_ERR(exp)) { + if (PTR_ERR(exp) == -EALREADY) { + /* export already exists, zero out this one */ + CERROR("Zeroing out duplicate export due to " + "bug 10479.\n"); + lcd->lcd_uuid[0] = '\0'; + } else { + GOTO(err_out, rc = PTR_ERR(exp)); + } + } else { + fed = &exp->exp_filter_data; + fed->fed_lcd = lcd; +#if 0 + fed->fed_group = lcd->lcd_group; +#endif + filter_export_stats_init(ofd, exp, NULL); + rc = filter_client_add(env, ofd, fed, cl_idx); + LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ + + lcd = NULL; + spin_lock(&exp->exp_lock); + exp->exp_connecting = 0; + exp->exp_in_recovery = 0; + spin_unlock(&exp->exp_lock); + obd->obd_max_recoverable_clients++; + class_export_put(exp); + } + + /* Need to check last_rcvd even for duplicated exports. */ + CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n", + cl_idx, last_rcvd); + + spin_lock(&ofd->ofd_transno_lock); + if (last_rcvd > fsd->lsd_last_transno) + fsd->lsd_last_transno = last_rcvd; + spin_unlock(&ofd->ofd_transno_lock); + } + + if (lcd) + OBD_FREE(lcd, sizeof(*lcd)); +err_out: + RETURN(rc); +} + +void filter_free_server_data(void) +{ + LBUG(); +} + +int filter_server_data_update(const struct lu_env *env, + struct filter_device *ofd) +{ + int rc = 0; + ENTRY; + + CDEBUG(D_SUPER, "OSS mount_count is "LPU64", last_transno is "LPU64"\n", + ofd->ofd_fsd.lsd_mount_count, ofd->ofd_fsd.lsd_last_transno); + + spin_lock(&ofd->ofd_transno_lock); + ofd->ofd_fsd.lsd_last_transno = ofd->ofd_last_transno; + spin_unlock(&ofd->ofd_transno_lock); + + /* + * This may be called from difficult reply handler and + * mdt->mdt_last_rcvd may be NULL that time. + */ + if (ofd->ofd_last_rcvd != NULL) { + struct thandle *th; + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + dt_declare_record_write(env, ofd->ofd_last_rcvd, 0, + sizeof(ofd->ofd_fsd), th, BYPASS_CAPA); + rc = filter_trans_start(env, ofd, th); + if (rc) + RETURN(rc); + rc = filter_last_rcvd_header_write(env, ofd, th); + filter_trans_stop(env, ofd, th); + } + + RETURN(rc); +} + +int filter_server_data_init(const struct lu_env *env, + struct filter_device *ofd) +{ + struct filter_thread_info *info = filter_info(env); + struct lr_server_data *fsd = &ofd->ofd_fsd; + struct obd_device *obd = filter_obd(ofd); + unsigned long last_rcvd_size; +#if 0 + __u64 mount_count; +#endif + int rc; + + rc = dt_attr_get(env, ofd->ofd_last_rcvd, &info->fti_attr, + BYPASS_CAPA); + if (rc) + RETURN(rc); + + last_rcvd_size = (unsigned long)info->fti_attr.la_size; + + /* ensure padding in the struct is the correct size */ + CLASSERT (offsetof(struct lr_server_data, lsd_padding) + + sizeof(fsd->lsd_padding) == LR_SERVER_SIZE); + + if (last_rcvd_size == 0) { + LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name); + + memcpy(fsd->lsd_uuid, obd->obd_uuid.uuid, + sizeof(fsd->lsd_uuid)); + fsd->lsd_last_transno = 0; + fsd->lsd_mount_count = 0; + fsd->lsd_server_size = LR_SERVER_SIZE; + fsd->lsd_client_start = LR_CLIENT_START; + fsd->lsd_client_size = LR_CLIENT_SIZE; + fsd->lsd_subdir_count = FILTER_SUBDIR_COUNT; + fsd->lsd_feature_incompat = OBD_INCOMPAT_OST; + } else { + rc = filter_last_rcvd_header_read(env, ofd); + if (rc) { + CDEBUG(D_INODE,"OBD filter: error reading %s: rc %d\n", + LAST_RCVD, rc); + GOTO(err_fsd, rc); + } + if (strcmp((char *) fsd->lsd_uuid, (char *) obd->obd_uuid.uuid)) { + LCONSOLE_ERROR("Trying to start OBD %s using the wrong" + " disk %s. Were the /dev/ assignments " + "rearranged?\n", + obd->obd_uuid.uuid, fsd->lsd_uuid); + GOTO(err_fsd, rc = -EINVAL); + } + } + + fsd->lsd_mount_count++; + ofd->ofd_subdir_count = fsd->lsd_subdir_count; + + if (fsd->lsd_feature_incompat & ~FILTER_INCOMPAT_SUPP) { + CERROR("%s: unsupported incompat filesystem feature(s) %x\n", + obd->obd_name, + fsd->lsd_feature_incompat & ~FILTER_INCOMPAT_SUPP); + GOTO(err_fsd, rc = -EINVAL); + } + if (fsd->lsd_feature_rocompat & ~FILTER_ROCOMPAT_SUPP) { + CERROR("%s: unsupported read-only filesystem feature(s) %x\n", + obd->obd_name, + fsd->lsd_feature_rocompat & ~FILTER_ROCOMPAT_SUPP); + /* Do something like remount filesystem read-only */ + GOTO(err_fsd, rc = -EINVAL); + } + + CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n", + obd->obd_name, fsd->lsd_last_transno); + CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", + obd->obd_name, fsd->lsd_mount_count); + CDEBUG(D_INODE, "%s: server data size: %u\n", + obd->obd_name, fsd->lsd_server_size); + CDEBUG(D_INODE, "%s: per-client data start: %u\n", + obd->obd_name, fsd->lsd_client_start); + CDEBUG(D_INODE, "%s: per-client data size: %u\n", + obd->obd_name, fsd->lsd_client_size); + CDEBUG(D_INODE, "%s: server subdir_count: %u\n", + obd->obd_name, fsd->lsd_subdir_count); + CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, + last_rcvd_size <= fsd->lsd_client_start ? 0 : + (last_rcvd_size - fsd->lsd_client_start) / + fsd->lsd_client_size); + + if (!obd->obd_replayable) { + CWARN("%s: recovery support OFF\n", obd->obd_name); + } + + rc = filter_clients_data_init(env, ofd, last_rcvd_size); + + spin_lock(&ofd->ofd_transno_lock); + obd->obd_last_committed = fsd->lsd_last_transno; + spin_unlock(&ofd->ofd_transno_lock); + + /* save it, so mount count and last_transno is current */ + rc = filter_server_data_update(env, ofd); + if (rc) + GOTO(err_fsd, rc); + + RETURN(0); + +err_fsd: + class_disconnect_exports(obd); + RETURN(rc); +} + diff --git a/lustre/ofd/ofd_grant.c b/lustre/ofd/ofd_grant.c new file mode 100644 index 0000000..8d4558c --- /dev/null +++ b/lustre/ofd/ofd_grant.c @@ -0,0 +1,515 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_grant.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Atul Vidwansa + * Author: Mike Pershin + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include "ofd_internal.h" + +/* Do extra sanity checks for grant accounting. We do this at connect, + * disconnect, and statfs RPC time, so it shouldn't be too bad. We can + * always get rid of it or turn it off when we know accounting is good. + */ +void filter_grant_sanity_check(struct obd_device *obd, const char *func) +{ + struct filter_export_data *fed; + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct obd_export *exp; + obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize; + obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0; + obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted; + + if (list_empty(&obd->obd_exports)) + return; + + /* We don't want to do this for large machines that do lots of + * mounts or unmounts. It burns... */ + if (obd->obd_num_exports > 100) + return; + + spin_lock(&obd->obd_osfs_lock); + spin_lock(&obd->obd_dev_lock); + list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) { + int error = 0; + fed = &exp->exp_filter_data; + if (fed->fed_grant < 0 || fed->fed_pending < 0 || + fed->fed_dirty < 0) + error = 1; + if (maxsize > 0) { /* we may not have done a statfs yet */ + LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize, + "%s: cli %s/%p %ld+%ld > "LPU64"\n", func, + exp->exp_client_uuid.uuid, exp, + fed->fed_grant, fed->fed_pending, maxsize); + LASSERTF(fed->fed_dirty <= maxsize, + "%s: cli %s/%p %ld > "LPU64"\n", func, + exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, maxsize); + } + if (error) + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending,fed->fed_grant); + else + CDEBUG(D_CACHE, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending,fed->fed_grant); + tot_granted += fed->fed_grant + fed->fed_pending; + tot_pending += fed->fed_pending; + tot_dirty += fed->fed_dirty; + } + fo_tot_granted = ofd->ofd_tot_granted; + fo_tot_pending = ofd->ofd_tot_pending; + fo_tot_dirty = ofd->ofd_tot_dirty; + spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_osfs_lock); + + /* Do these assertions outside the spinlocks so we don't kill system */ + if (tot_granted != fo_tot_granted) + CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n", + func, tot_granted, fo_tot_granted); + if (tot_pending != fo_tot_pending) + CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n", + func, tot_pending, fo_tot_pending); + if (tot_dirty != fo_tot_dirty) + CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n", + func, tot_dirty, fo_tot_dirty); + if (tot_pending > tot_granted) + CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n", + func, tot_pending, tot_granted); + if (tot_granted > maxsize) + CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n", + func, tot_granted, maxsize); + if (tot_dirty > maxsize) + CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n", + func, tot_dirty, maxsize); +} + +/* Remove this client from the grant accounting totals. We also remove + * the export from the obd device under the osfs and dev locks to ensure + * that the filter_grant_sanity_check() calculations are always valid. + * The client should do something similar when it invalidates its import. + */ +void filter_grant_discard(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_device *ofd = filter_exp(exp); + struct filter_export_data *fed = &exp->exp_filter_data; + + spin_lock(&obd->obd_osfs_lock); + spin_lock(&obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain); + spin_unlock(&obd->obd_dev_lock); + + LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant, + "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n", + obd->obd_name, ofd->ofd_tot_granted, + exp->exp_client_uuid.uuid, exp, fed->fed_grant); + ofd->ofd_tot_granted -= fed->fed_grant; + LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending, + "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n", + obd->obd_name, ofd->ofd_tot_pending, + exp->exp_client_uuid.uuid, exp, fed->fed_pending); + /* ofd_tot_pending is handled in filter_grant_commit as bulk finishes */ + LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty, + "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n", + obd->obd_name, ofd->ofd_tot_dirty, + exp->exp_client_uuid.uuid, exp, fed->fed_dirty); + ofd->ofd_tot_dirty -= fed->fed_dirty; + fed->fed_dirty = 0; + fed->fed_grant = 0; + spin_unlock(&obd->obd_osfs_lock); +} + +/* + * Grab the dirty and seen grant announcements from the incoming obdo. + * We will later calculate the clients new grant and return it. + * Caller must hold osfs lock. + */ +void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) +{ + struct filter_export_data *fed; + struct filter_device *ofd = filter_exp(exp); + struct obd_device *obd = exp->exp_obd; + ENTRY; + + LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); + + /* Check if OST supports grants (OBD_MD_FLGRANT) or allocated block + * flag is set. + */ + if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) != + (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) { + oa->o_valid &= ~OBD_MD_FLGRANT; + EXIT; + return; + } + + fed = &exp->exp_filter_data; + + /* Add some margin, since there is a small race if other RPCs arrive + * out-or-order and have already consumed some grant. We want to + * leave this here in case there is a large error in accounting. + */ + CDEBUG(D_CACHE, + "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant, + oa->o_dropped, fed->fed_grant); + + /* Update our accounting now so that statfs takes it into account. + * Note that fed_dirty is only approximate and can become incorrect + * if RPCs arrive out-of-order. No important calculations depend + * on fed_dirty however, but we must check sanity to not assert. + */ + if ((long long)oa->o_dirty < 0) + oa->o_dirty = 0; + else if (oa->o_dirty > fed->fed_grant + 4 * FILTER_GRANT_CHUNK) + oa->o_dirty = fed->fed_grant + 4 * FILTER_GRANT_CHUNK; + ofd->ofd_tot_dirty += oa->o_dirty - fed->fed_dirty; + if (fed->fed_grant < oa->o_dropped) { + CDEBUG(D_CACHE,"%s: cli %s/%p reports %u dropped > grant %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + oa->o_dropped, fed->fed_grant); + oa->o_dropped = 0; + } + if (ofd->ofd_tot_granted < oa->o_dropped) { + CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + oa->o_dropped, ofd->ofd_tot_granted); + oa->o_dropped = 0; + } + ofd->ofd_tot_granted -= oa->o_dropped; + fed->fed_grant -= oa->o_dropped; + fed->fed_dirty = oa->o_dirty; + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } + EXIT; +} + +/* Figure out how much space is available between what we've granted + * and what remains in the filesystem. Compensate for ext3 indirect + * block overhead when computing how much free space is left ungranted. + * Caller must hold obd_osfs_lock. + */ +obd_size filter_grant_space_left(const struct lu_env *env, + struct obd_export *exp) +{ + struct filter_device *ofd = filter_exp(exp); + struct obd_device *obd = exp->exp_obd; + struct filter_thread_info *info = filter_info(env); + obd_size tot_granted = ofd->ofd_tot_granted, avail, left = 0; + int statfs_done = 0; + long frsize; + + LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); + + if (cfs_time_before_64(obd->obd_osfs_age, + cfs_time_current_64() - HZ)) { +restat: + dt_statfs(env, ofd->ofd_osd, &info->fti_u.ksfs); + statfs_pack(&obd->obd_osfs, &info->fti_u.ksfs); + statfs_done = 1; + } + frsize = obd->obd_osfs.os_bsize; + avail = obd->obd_osfs.os_bavail; /* in fragments */ + LASSERT(frsize); + /* + * Consider metadata overhead for allocating new blocks while + * calculating available space left. + */ +#if defined(LINUX) + left = avail - (avail / (frsize >> 3)); /* (d)indirect */ +#else + left = avail; +#endif + if (left > GRANT_FOR_LLOG) + left = (left - GRANT_FOR_LLOG) * frsize; + else + left = 0; + + if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) { + CDEBUG(D_CACHE, "fs has no space left and statfs too old\n"); + goto restat; + } + + /* bytes now, is obd_size enough for 'left'? */ + if (left >= tot_granted) { + left -= tot_granted; + } else { + if (left < tot_granted - ofd->ofd_tot_pending) { + CERROR("%s: cli %s/%p grant "LPU64" > available " + LPU64" and pending "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, exp, tot_granted, + left, ofd->ofd_tot_pending); + } + left = 0; + } + + CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64 + " left: "LPU64" pending: "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, exp, + obd->obd_osfs.os_bfree * frsize, avail * frsize, + tot_granted, left, ofd->ofd_tot_pending); + + return left; +} + +/* Substract what client have used already. We don't subtract + * this from the tot_granted yet, so that other client's can't grab + * that space before we have actually allocated our blocks. That + * happens in filter_grant_commit() after the writes are done. + */ +int filter_grant_client_calc(struct obd_export *exp, obd_size *left, + unsigned long *used, unsigned long *ungranted) +{ + struct filter_device *ofd = filter_exp(exp); + struct obd_device *obd = exp->exp_obd; + struct filter_export_data *fed = &exp->exp_filter_data; + unsigned long using = 0; + int rc = 0; + + LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); + + *left -= *ungranted; + LASSERT(fed->fed_grant >= *used); + fed->fed_grant -= *used; + fed->fed_pending += *used + *ungranted; + ofd->ofd_tot_granted += *ungranted; + ofd->ofd_tot_pending += *used + *ungranted; + + CDEBUG(D_CACHE, + "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, *used, + *ungranted, fed->fed_grant, fed->fed_dirty); + + /* Rough calc in case we don't refresh cached statfs data, + * in fragments */ + LASSERT(obd->obd_osfs.os_bsize); + using = ((*used + *ungranted + 1 ) / obd->obd_osfs.os_bsize); + if (obd->obd_osfs.os_bavail > using) + obd->obd_osfs.os_bavail -= using; + else + obd->obd_osfs.os_bavail = 0; + + if (fed->fed_dirty < *used) { + CWARN("%s: cli %s/%p claims used %lu > fed_dirty %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + *used, fed->fed_dirty); + *used = fed->fed_dirty; + } + ofd->ofd_tot_dirty -= *used; + fed->fed_dirty -= *used; + + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } + return rc; +} + +/* When clients have dirtied as much space as they've been granted they + * fall through to sync writes. These sync writes haven't been expressed + * in grants and need to error with ENOSPC when there isn't room in the + * filesystem for them after grants are taken into account. However, + * writeback of the dirty data that was already granted space can write + * right on through. + * Caller must hold obd_osfs_lock. + */ +int filter_grant_check(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *objs, + struct niobuf_remote *rnb, struct niobuf_local *lnb, + obd_size *left, unsigned long *used, unsigned long *ungranted) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + int i, rc = -ENOSPC, obj, n = 0; + + LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock); + + for (obj = 0; obj < objcount; obj++) { + for (i = 0; i < objs[obj].ioo_bufcnt; i++, n++) { + int bytes; + + bytes = lnb[n].bytes; + if ((rnb[n].flags & OBD_BRW_FROM_GRANT) && + (oa->o_valid & OBD_MD_FLGRANT)) { + if (fed->fed_grant < *used + bytes) { + CDEBUG(D_CACHE, + "%s: cli %s/%p claims %ld+%d " + "GRANT, real grant %lu idx %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, exp, + *used, bytes, fed->fed_grant, n); + } else { + *used += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; + CDEBUG(0, "idx %d used=%lu\n", n, *used); + rc = 0; + continue; + } + } + if (*left > *ungranted + bytes) { + /* if enough space, pretend it was granted */ + *ungranted += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; + CDEBUG(0, "idx %d ungranted=%lu\n",n,*ungranted); + rc = 0; + continue; + } + + /* We can't check for already-mapped blocks here, as + * it requires dropping the osfs lock to do the bmap. + * Instead, we return ENOSPC and in that case we need + * to go through and verify if all of the blocks not + * marked BRW_GRANTED are already mapped and we can + * ignore this error. + */ + lnb[n].rc = -ENOSPC; + rnb[n].flags &= ~OBD_BRW_GRANTED; + CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, exp, n, bytes); + } + } + + return rc; +} + +/* Calculate how much grant space to allocate to this client, based on how + * much space is currently free and how much of that is already granted. + * Caller must hold obd_osfs_lock. + */ +long filter_grant(const struct lu_env *env, struct obd_export *exp, + obd_size current_grant, obd_size want, + obd_size fs_space_left) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_device *ofd = filter_exp(exp); + struct filter_export_data *fed = &exp->exp_filter_data; + long frsize = obd->obd_osfs.os_bsize; + __u64 grant = 0; + + LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); + LASSERT(frsize); + + /* Grant some fraction of the client's requested grant space so that + * they are not always waiting for write credits (not all of it to + * avoid overgranting in face of multiple RPCs in flight). This + * essentially will be able to control the OSC_MAX_RIF for a client. + * + * If we do have a large disparity between what the client thinks it + * has and what we think it has, don't grant very much and let the + * client consume its grant first. Either it just has lots of RPCs + * in flight, or it was evicted and its grants will soon be used up. + */ + if (want > 0x7fffffff) { + CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, want); + } else if (current_grant < want && + current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) { +#if 0 + grant = (min(want, fs_space_left >> 3) / frsize) * frsize; +#else + CERROR("not implemented yet\n"); + grant = min(want, fs_space_left >> 3); +#endif + if (grant) { + /* Allow >FILTER_GRANT_CHUNK size when clients + * reconnect due to a server reboot. + */ + if ((grant > FILTER_GRANT_CHUNK) && + (!obd->obd_recovering)) + grant = FILTER_GRANT_CHUNK; + + ofd->ofd_tot_granted += grant; + fed->fed_grant += grant; + if (fed->fed_grant < 0) { + CERROR("%s: cli %s/%p grant %ld want "LPU64 + "current"LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, exp, + fed->fed_grant, want, current_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } + } + } + + CDEBUG(D_CACHE, + "%s: cli %s/%p wants: "LPU64" current grant "LPU64 + " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid, + exp, want, current_grant, grant); + CDEBUG(D_CACHE, + "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64 + " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid, + exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted, + obd->obd_num_exports); + + return grant; +} + +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res) +{ + struct filter_device *ofd = filter_exp(exp); + struct niobuf_local *lnb = res; + unsigned long pending = 0; + int i; + + spin_lock(&exp->exp_obd->obd_osfs_lock); + for (i = 0, lnb = res; i < niocount; i++, lnb++) + pending += lnb->lnb_grant_used; + + LASSERTF(exp->exp_filter_data.fed_pending >= pending, + "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + exp->exp_filter_data.fed_pending, pending); + exp->exp_filter_data.fed_pending -= pending; + LASSERTF(ofd->ofd_tot_granted >= pending, + "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + ofd->ofd_tot_granted, pending); + ofd->ofd_tot_granted -= pending; + LASSERTF(ofd->ofd_tot_pending >= pending, + "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + ofd->ofd_tot_pending, pending); + ofd->ofd_tot_pending -= pending; + + spin_unlock(&exp->exp_obd->obd_osfs_lock); +} + diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h new file mode 100644 index 0000000..73400c8 --- /dev/null +++ b/lustre/ofd/ofd_internal.h @@ -0,0 +1,533 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ + +#ifndef _FILTER_INTERNAL_H +#define _FILTER_INTERNAL_H + +#ifdef __KERNEL__ +# include +#endif +#include +#include +#include +#include +#include +#include +#include + + +#define FILTER_GROUPS_FILE "groups" + +#define FILTER_LAYOUT_VERSION "2" + +#define FILTER_INIT_OBJID 0 + +#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */ + +#define FILTER_ROCOMPAT_SUPP (0) + +#define FILTER_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS | OBD_INCOMPAT_OST | \ + OBD_INCOMPAT_COMMON_LR) + +#define FILTER_GRANT_CHUNK (2ULL * PTLRPC_MAX_BRW_SIZE) +#define GRANT_FOR_LLOG 16 + +#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */ + +extern struct file_operations filter_per_export_stats_fops; + +/* Data stored per client in the last_rcvd file. In le32 order. */ +struct filter_client_data { + __u8 fcd_uuid[40]; /* client UUID */ + __u64 fcd_last_rcvd; /* last completed transaction ID */ + __u64 fcd_last_xid; /* client RPC xid for the last transaction */ + __u32 fcd_group; /* mds group */ + __u8 fcd_padding[LR_CLIENT_SIZE - 60]; +}; + +/* Limit the returned fields marked valid to those that we actually might set */ +#define FILTER_VALID_FLAGS (LA_TYPE | LA_MODE | LA_SIZE | LA_BLOCKS | \ + LA_BLKSIZE | LA_ATIME | LA_MTIME | LA_CTIME) + +/* per-client-per-object persistent state (LRU) */ +struct filter_mod_data { + struct list_head fmd_list; /* linked to fed_mod_list */ + struct lu_fid fmd_fid; /* FID being written to */ + __u64 fmd_mactime_xid;/* xid highest {m,a,c}time setattr */ + cfs_time_t fmd_expire; /* time when the fmd should expire */ + int fmd_refcount; /* reference counter - list holds 1 */ +}; + +#ifdef BGL_SUPPORT +#define FILTER_FMD_MAX_NUM_DEFAULT 128 /* many active files per client on BGL */ +#else +#define FILTER_FMD_MAX_NUM_DEFAULT 32 +#endif +#define FILTER_FMD_MAX_AGE_DEFAULT ((obd_timeout + 10) * HZ) + +int ofd_fmd_init(void); +void ofd_fmd_exit(void); +struct filter_mod_data *filter_fmd_find(struct obd_export *exp, + struct lu_fid *fid); +struct filter_mod_data *filter_fmd_get(struct obd_export *exp, + struct lu_fid *fid); +void filter_fmd_put(struct obd_export *exp, struct filter_mod_data *fmd); +void filter_fmd_expire(struct obd_export *exp); +void filter_fmd_cleanup(struct obd_export *exp); +#ifdef DO_FMD_DROP +void filter_fmd_drop(struct obd_export *exp, struct lu_fid *fid); +#else +#define filter_fmd_drop(exp, fid) +#endif + +enum { + LPROC_FILTER_READ_BYTES = 0, + LPROC_FILTER_WRITE_BYTES = 1, + LPROC_FILTER_LAST, +}; + +//#define FILTER_MAX_CACHE_SIZE (32 * 1024 * 1024) /* was OBD_OBJECT_EOF */ +#define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF + +#ifdef LPROCFS +void filter_tally(struct obd_export *exp, struct page **pages, int nr_pages, + unsigned long *blocks, int blocks_per_page, int wr); +int lproc_filter_attach_seqstat(struct obd_device *dev); +void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars); +#else +static inline void filter_tally(struct obd_export *exp, struct page **pages, + int nr_pages, unsigned long *blocks, + int blocks_per_page, int wr) {} +static inline int lproc_filter_attach_seqstat(struct obd_device *dev) +{ + return 0; +} +static inline void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars) +{ + memset(lvars, 0, sizeof(*lvars)); +} +#endif + +/* Quota stuff */ +extern quota_interface_t *filter_quota_interface_ref; + +/* Capability */ + +void blacklist_add(uid_t uid); +void blacklist_del(uid_t uid); +int blacklist_display(char *buf, int bufsize); + +#define FILTER_MAX_GROUPS 256 + +struct filter_device { + struct dt_device ofd_dt_dev; + struct dt_device *ofd_osd; + struct dt_device_param ofd_dt_conf; + /* DLM name-space for meta-data locks maintained by this server */ + struct ldlm_namespace *ofd_namespace; + /* ptlrpc handle for OST->client connections (for lock ASTs). */ + struct ptlrpc_client *ofd_ldlm_client; + + /* transaction callbacks */ + struct dt_txn_callback ofd_txn_cb; + spinlock_t ofd_transno_lock; + __u64 ofd_last_transno; + + /* last_rcvd file */ + struct dt_object *ofd_last_rcvd; + struct dt_object *ofd_groups_file; + struct lr_server_data ofd_fsd; + spinlock_t ofd_client_bitmap_lock; + unsigned long *ofd_last_rcvd_slots; + + int ofd_subdir_count; + + struct list_head ofd_llog_list; + spinlock_t ofd_llog_list_lock; + void *ofd_lcm; + + /* XXX: make the following dynamic */ + int ofd_max_group; + obd_id ofd_last_objids[FILTER_MAX_GROUPS]; + struct semaphore ofd_create_locks[FILTER_MAX_GROUPS]; + spinlock_t ofd_objid_lock; + unsigned long ofd_destroys_in_progress; + + /* grants: all values in bytes */ + spinlock_t ofd_grant_lock; + obd_size ofd_tot_dirty; + obd_size ofd_tot_granted; + obd_size ofd_tot_pending; + + /* filter mod data: filter_device wide values */ + int ofd_fmd_max_num; /* per ofd filter_mod_data */ + cfs_duration_t ofd_fmd_max_age; /* time to fmd expiry */ + + /* sptlrpc stuff */ + rwlock_t ofd_sptlrpc_lock; + struct sptlrpc_rule_set ofd_sptlrpc_rset; + + /* capability related */ + unsigned int ofd_fl_oss_capa; + struct list_head ofd_capa_keys; + struct hlist_head *ofd_capa_hash; +}; + +static inline struct filter_device *filter_dev(struct lu_device *d) +{ + return container_of0(d, struct filter_device, ofd_dt_dev.dd_lu_dev); +} + +static inline struct obd_device *filter_obd(struct filter_device *ofd) +{ + return ofd->ofd_dt_dev.dd_lu_dev.ld_obd; +} + +static inline struct filter_device *filter_exp(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + return filter_dev(obd->obd_lu_dev); +} + +struct filter_object { + struct lu_object_header ofo_header; + struct dt_object ofo_obj; +}; + +static inline struct filter_object *filter_obj(struct lu_object *o) +{ + return container_of0(o, struct filter_object, ofo_obj.do_lu); +} + +static inline int filter_object_exists(struct filter_object *obj) +{ + LASSERT(obj != NULL); + return lu_object_exists(&obj->ofo_obj.do_lu); +} + +static inline struct dt_object * fo2dt(struct filter_object *obj) +{ + return &obj->ofo_obj; +} + +static inline struct dt_object *filter_object_child(struct filter_object *_obj) +{ + struct lu_object *lu = &(_obj)->ofo_obj.do_lu; + return container_of0(lu_object_next(lu), struct dt_object, do_lu); +} + +static inline +struct filter_device *filter_obj2dev(const struct filter_object *fo) +{ + return filter_dev(fo->ofo_obj.do_lu.lo_dev); +} + +static inline +struct lustre_capa *filter_object_capa(const struct lu_env *env, + const struct filter_object *obj) +{ + /* TODO: see mdd_object_capa() */ + return BYPASS_CAPA; +} + +static inline void filter_write_lock(const struct lu_env *env, + struct filter_object *fo, int role) +{ + struct dt_object *next = filter_object_child(fo); + next->do_ops->do_write_lock(env, next, role); +} + +static inline void filter_write_unlock(const struct lu_env *env, + struct filter_object *fo) +{ + struct dt_object *next = filter_object_child(fo); + next->do_ops->do_write_unlock(env, next); +} + +/* + * Common data shared by obdfilter-level handlers. This is allocated per-thread + * to reduce stack consumption. + */ +struct filter_thread_info { + const struct lu_env *fti_env; + + /* request related data */ + struct obd_export *fti_exp; + __u64 fti_xid; + __u64 fti_transno; + __u32 fti_has_trans:1, /* has txn already? */ + fti_no_need_trans:1; + + struct lu_fid fti_fid; + struct lu_attr fti_attr; + struct ldlm_res_id fti_resid; + + union { + char ns_name[48]; /* for obdfilter_init0() */ + struct lustre_cfg_bufs bufs; /* for obdfilter_stack_fini()*/ + struct kstatfs ksfs; /* for obdfilter_statfs() */ + } fti_u; + + /* server and client data buffers */ + struct lr_server_data fti_fsd; + struct filter_client_data fti_fcd; + struct lu_buf fti_buf; + loff_t fti_off; + + /* Ops object filename */ + struct lu_name fti_name; +}; + +extern struct lu_context_key filter_txn_thread_key; +extern struct lu_context_key filter_thread_key; + +static inline struct filter_thread_info * filter_info(const struct lu_env *env) +{ + struct filter_thread_info *info; + + info = lu_context_key_get(&env->le_ctx, &filter_thread_key); + LASSERT(info); + LASSERT(info->fti_env); + LASSERT(info->fti_env == env); + return info; +} + +static inline +struct filter_thread_info * filter_info_init(const struct lu_env *env, + struct obd_export *exp) +{ + struct filter_thread_info *info; + + info = lu_context_key_get(&env->le_ctx, &filter_thread_key); + LASSERT(info->fti_exp == 0); + LASSERT(info->fti_env == 0); + LASSERT(info->fti_attr.la_valid == 0); + + info->fti_env = env; + info->fti_exp = exp; + return info; +} + +typedef void (*filter_cb_t)(const struct filter_device *mdt, __u64 transno, + void *data, int err); +struct filter_commit_cb { + filter_cb_t filter_cb_func; + void *filter_cb_data; +}; + +/* + * Info allocated per-transaction. + */ +#define OFD_MAX_COMMIT_CB 4 +struct filter_txn_info { + __u64 txi_transno; + unsigned int txi_cb_count; + struct filter_commit_cb txi_cb[OFD_MAX_COMMIT_CB]; +}; + +static inline void filter_trans_add_cb(const struct thandle *th, + filter_cb_t cb_func, void *cb_data) +{ + struct filter_txn_info *txi; + + txi = lu_context_key_get(&th->th_ctx, &filter_txn_thread_key); + LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb)); + + /* add new callback */ + txi->txi_cb[txi->txi_cb_count].filter_cb_func = cb_func; + txi->txi_cb[txi->txi_cb_count].filter_cb_data = cb_data; + txi->txi_cb_count++; +} + + +extern void target_recovery_fini(struct obd_device *obd); +extern void target_recovery_init(struct obd_device *obd, + svc_handler_t handler); + +static inline int filter_export_stats_init(struct filter_device *ofd, + struct obd_export *exp, void *data) +{ + return 0; +} + +/* filter_capa.c */ +int filter_update_capa_key(struct filter_device *, struct lustre_capa_key *); +int filter_auth_capa(struct filter_device *, struct lu_fid *, __u64, + struct lustre_capa *, __u64); +void filter_free_capa_keys(struct filter_device *ofd); + +/* filter_obd.c */ +int filter_setattr(struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti); +int filter_destroy(struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md *md, + struct obd_trans_info *oti, struct obd_export *md_exp, + void *capa); + +/* filter_lvb.c */ +extern struct ldlm_valblock_ops filter_lvbo; + + +/* filter_io.c */ +int filter_preprw(int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *nr_local, struct niobuf_local *res, + struct obd_trans_info *oti, struct lustre_capa *capa); +int filter_commitrw(int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int npages, struct niobuf_local *res, + struct obd_trans_info *oti, int rc); +int filter_brw(int cmd, struct obd_export *, struct obd_info *oinfo, + obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *); +void flip_into_page_cache(struct inode *inode, struct page *new_page); + +/* filter_io_*.c */ +struct filter_iobuf; +struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *, int rw, + int num_pages); +void filter_free_iobuf(struct filter_iobuf *iobuf); +int filter_iobuf_add_page(struct obd_device *obd, struct filter_iobuf *iobuf, + struct inode *inode, struct page *page); +void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti); +void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf, + struct obd_trans_info *oti); +int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf, + struct obd_export *exp, struct iattr *attr, + struct obd_trans_info *oti, void **wait_handle); +int filter_clear_truncated_page(struct inode *inode); + +/* filter_log.c */ + +struct ost_filterdata { + __u32 ofd_epoch; +}; +int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg, + struct obd_device *tgt, int count, + struct llog_catid *catid, struct obd_uuid *uuid); +int filter_llog_finish(struct obd_device *obd, int count); +int filter_log_sz_change(struct llog_handle *cathandle, + struct ll_fid *mds_fid, + __u32 ioepoch, + struct llog_cookie *logcookie, + struct inode *inode); +void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, + void *cb_data, int error); +int filter_recov_log_mds_ost_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data); +struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group); +struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group); + +extern struct ldlm_valblock_ops filter_lvbo; + +/* filter_recovery.c */ +struct thandle *filter_trans_create(const struct lu_env *env, + struct filter_device *ofd); +int filter_trans_start(const struct lu_env *env, + struct filter_device *ofd, struct thandle *th); +void filter_trans_stop(const struct lu_env *env, + struct filter_device *ofd, struct thandle *th); +int filter_client_free(struct lu_env *env, struct obd_export *exp); +int filter_client_new(const struct lu_env *env, struct filter_device *ofd, + struct filter_export_data *fed); +int filter_client_add(const struct lu_env *env, struct filter_device *ofd, + struct filter_export_data *fed, int cl_idx); +int filter_fs_setup(const struct lu_env *env, struct filter_device *ofd, + struct obd_device *obd); +void filter_fs_cleanup(const struct lu_env *env, struct filter_device *ofd); + +/* filter_fs.c */ +obd_id filter_last_id(struct filter_device *ofd, obd_gr group); +void filter_last_id_set(struct filter_device *ofd, obd_id id, obd_gr group); +int filter_last_id_write(const struct lu_env *env, struct filter_device *ofd, + obd_gr group, int force_sync); +int filter_last_id_read(const struct lu_env *env, struct filter_device *ofd, + obd_gr group); +int filter_groups_init(const struct lu_env *env, struct filter_device *ofd); +int filter_last_rcvd_header_write(const struct lu_env *env, + struct filter_device *ofd, + struct thandle *th); +int filter_last_rcvd_write(const struct lu_env *env, + struct filter_device *ofd, + struct lsd_client_data *lcd, + loff_t *off, struct thandle *th); +int filter_server_data_init(const struct lu_env *env, + struct filter_device *ofd); +int filter_server_data_update(const struct lu_env *env, + struct filter_device *ofd); + +/* filter_objects.c */ +struct filter_object *filter_object_find(const struct lu_env *env, + struct filter_device *ofd, + const struct lu_fid *fid); +struct +filter_object *filter_object_find_or_create(const struct lu_env *env, + struct filter_device *ofd, + const struct lu_fid *fid, + struct lu_attr *attr); + +void filter_object_put(const struct lu_env *env, struct filter_object *fo); +int filter_attr_set(const struct lu_env *env, struct filter_object *fo, + const struct lu_attr *la); +int filter_object_punch(const struct lu_env *env, struct filter_object *fo, + __u64 start, __u64 end, struct obdo *oa); +int filter_object_destroy(const struct lu_env *env, struct filter_object *fo); +int filter_attr_get(const struct lu_env *env, struct filter_object *fo, + struct lu_attr *la); + +/* filter_grants.c */ +void filter_grant_discard(struct obd_export *exp); +void filter_grant_sanity_check(struct obd_device *obd, const char *func); +void filter_grant_incoming(struct obd_export *exp, struct obdo *oa); +obd_size filter_grant_space_left(const struct lu_env *env, + struct obd_export *exp); +int filter_grant_client_calc(struct obd_export *exp, obd_size *left, + unsigned long *used, unsigned long *ungranted); +int filter_grant_check(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *objs, + struct niobuf_remote *rnb, struct niobuf_local *lnb, + obd_size *left, unsigned long *used, unsigned long *ungranted); +long filter_grant(const struct lu_env *env, struct obd_export *exp, + obd_size current_grant, obd_size want, + obd_size fs_space_left); +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res); + +/* IDIF stuff */ +#include +static inline void lu_idif_build(struct lu_fid *fid, obd_id id, obd_gr gr) +{ + LASSERT((id >> 48) == 0); + fid->f_seq = (0x200000000ULL | id >> 32); + fid->f_oid = (__u32)(id & 0xffffffff); + fid->f_ver = gr; +} + +static inline obd_id lu_idif_id(const struct lu_fid *fid) +{ + return ((fid->f_seq & 0xffff) << 32) | fid->f_oid; +} + +static inline obd_gr lu_idif_gr(const struct lu_fid * fid) +{ + return fid->f_ver; +} + +static inline struct ldlm_res_id * lu_idif_resid(const struct lu_fid *fid, + struct ldlm_res_id *name) +{ + name->name[LUSTRE_RES_ID_SEQ_OFF] = lu_idif_id(fid); + name->name[LUSTRE_RES_ID_OID_OFF] = 0; + name->name[LUSTRE_RES_ID_VER_OFF] = lu_idif_gr(fid); + name->name[LUSTRE_RES_ID_HSH_OFF] = 0; + return name; +} + +static inline void lu_idif_from_resid(struct lu_fid *fid, + const struct ldlm_res_id *name) +{ + lu_idif_build(fid, name->name[LUSTRE_RES_ID_SEQ_OFF], + name->name[LUSTRE_RES_ID_VER_OFF]); +} + +#endif /* _FILTER_INTERNAL_H */ diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c new file mode 100644 index 0000000..99f6883 --- /dev/null +++ b/lustre/ofd/ofd_io.c @@ -0,0 +1,378 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_io.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Alex Tomas + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include + +#include "ofd_internal.h" + +static int filter_preprw_read(const struct lu_env *env, + struct filter_device *ofd, struct lu_fid *fid, + struct lu_attr *la, int niocount, + struct niobuf_remote *nb, int *nr_local, + struct niobuf_local *res) +{ + struct filter_object *fo; + int i, j, rc = 0; + LASSERT(env != NULL); + + fo = filter_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + + if (filter_object_exists(fo)) { + /* parse remote buffers to local buffers + and prepare the latter */ + for (i = 0, j = 0; i < niocount; i++) { + rc = dt_bufs_get(env, filter_object_child(fo), + nb + i, res + j); + LASSERT(rc > 0); + LASSERT(rc < PTLRPC_MAX_BRW_PAGES); + /* correct index for local buffers to continue with */ + j += rc; + LASSERT(j <= PTLRPC_MAX_BRW_PAGES); + } + *nr_local = j; + LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES); + rc = dt_attr_get(env, filter_object_child(fo), la, + filter_object_capa(env, fo)); + LASSERT(rc == 0); + rc = dt_read_prep(env, filter_object_child(fo), res, + *nr_local); + } else { + /* CROW object. We have to simulate empty buffers */ + for (i = 0, j = 0; i < niocount; i++) { + res[j].file_offset = nb[i].offset; + res[j].page_offset = 0; + res[j].len = 0; + res[j].page = NULL; + res[j].lnb_grant_used = 0; + res[j].bytes = 0; + res[j].rc = 0; + j++; + LASSERT(j <= PTLRPC_MAX_BRW_PAGES); + } + *nr_local = j; + filter_attr_get(env, fo, la); + } + + filter_object_put(env, fo); + RETURN(rc); +} + +static int filter_preprw_write(const struct lu_env *env, struct obd_export *exp, + struct filter_device *ofd, struct lu_fid *fid, + struct lu_attr *la, struct obdo *oa, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *nr_local, + struct niobuf_local *res) +{ + unsigned long used = 0, ungranted = 0; + obd_size left; + struct filter_object *fo; + int i, j, rc = 0; + + ENTRY; + LASSERT(env != NULL); + + fo = filter_object_find_or_create(env, ofd, fid, la); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + LASSERT(filter_object_exists(fo)); + + /* parse remote buffers to local buffers and prepare the latter */ + for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) { + rc = dt_bufs_get(env, filter_object_child(fo), + nb + i, res + j); + LASSERT(rc > 0); + LASSERT(rc < PTLRPC_MAX_BRW_PAGES); + /* correct index for local buffers to continue with */ + j += rc; + LASSERT(j <= PTLRPC_MAX_BRW_PAGES); + } + *nr_local = j; + LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES); + + spin_lock(&exp->exp_obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + left = filter_grant_space_left(env, exp); + + rc = filter_grant_check(env, exp, oa, objcount, obj, nb, + res, &left, &used, &ungranted); + + rc = dt_write_prep(env, filter_object_child(fo), res, *nr_local, &used); + + rc = filter_grant_client_calc(exp, &left, &used, &ungranted); + + /* do not zero out oa->o_valid as it is used in + * * filter_commitrw_write() for setting UID/GID and + * * fid EA in first write time. */ + if (oa->o_valid & OBD_MD_FLGRANT) + oa->o_grant = filter_grant(env, exp, oa->o_grant, + oa->o_undirty, left); + spin_unlock(&exp->exp_obd->obd_osfs_lock); + filter_object_put(env, fo); + RETURN(rc); +} + +int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, + struct obd_ioobj *obj, struct niobuf_remote *nb, + int *nr_local, struct niobuf_local *res, + struct obd_trans_info *oti, struct lustre_capa *capa) +{ + struct lu_env env; + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + int rc = 0; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, exp); + + LASSERT(objcount == 1); + LASSERT(obj->ioo_bufcnt > 0); + + lu_idif_build(&info->fti_fid, obj->ioo_id, obj->ioo_gr); + + if (cmd == OBD_BRW_WRITE) { + rc = filter_auth_capa(ofd, &info->fti_fid, obdo_mdsno(oa), + capa, CAPA_OPC_OSS_WRITE); + if (rc == 0) { + LASSERT(oa != NULL); + la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR); + /* XXX: shouldn't we get this from odbo? */ + info->fti_attr.la_valid = LA_TYPE|LA_MODE; + info->fti_attr.la_mode = S_IFREG | 0666; + + rc = filter_preprw_write(&env, exp, ofd, &info->fti_fid, + &info->fti_attr, oa, objcount, + obj, nb, nr_local, + res); + } + } else if (cmd == OBD_BRW_READ) { + rc = filter_auth_capa(ofd, &info->fti_fid, obdo_mdsno(oa), + capa, CAPA_OPC_OSS_READ); + if (rc == 0) { + if (oa && oa->o_valid & OBD_MD_FLGRANT) { + struct obd_device *obd = filter_obd(ofd); + spin_lock(&obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + + oa->o_grant = 0; + spin_unlock(&obd->obd_osfs_lock); + } + rc = filter_preprw_read(&env, ofd, &info->fti_fid, + &info->fti_attr, obj->ioo_bufcnt, + nb, nr_local, res); + obdo_from_la(oa, &info->fti_attr, LA_ATIME); + } + } else { + LBUG(); + rc = -EPROTO; + } + lu_env_fini(&env); + RETURN(rc); +} + +static int +filter_commitrw_read(const struct lu_env *env, struct filter_device *ofd, + struct lu_fid *fid, int objcount, int niocount, + struct niobuf_local *res) +{ + struct filter_object *fo; + ENTRY; + + LASSERT(niocount > 0); + + fo = filter_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + if (filter_object_exists(fo)) { + dt_bufs_put(env, filter_object_child(fo), res, niocount); + } else { + /* CROW object, do nothing */ + } + + filter_object_put(env, fo); + RETURN(0); +} + +static int +filter_commitrw_write(const struct lu_env *env, struct filter_device *ofd, + struct lu_fid *fid, struct lu_attr *la, int objcount, + int niocount, struct niobuf_local *res, int old_rc) +{ + struct filter_thread_info *info = filter_info(env); + struct filter_object *fo; + struct thandle *th; + int rc = 0; + ENTRY; + + LASSERT(objcount == 1); + + fo = filter_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + LASSERT(filter_object_exists(fo)); + if (old_rc) + GOTO(out, rc = old_rc); + + /* XXX: need 1 here until support on client for async writes */ +#if 0 + info->fti_txn_param.tp_sync = 0; +#endif + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rc = dt_declare_write_commit(env, filter_object_child(fo), + res, niocount, th); + LASSERT(rc == 0); + + if (la->la_valid) { + rc = dt_declare_attr_set(env, filter_object_child(fo), la, + th, filter_object_capa(env, fo)); + LASSERT(rc == 0); + } + + rc = filter_trans_start(env, ofd, th); + if (rc) + GOTO(out, rc); + + rc = dt_write_commit(env, filter_object_child(fo), res, niocount, th); + LASSERT(rc == 0); + + if (la->la_valid) { + rc = dt_attr_set(env, filter_object_child(fo), la, th); + LASSERT(rc == 0); + } + + filter_trans_stop(env, ofd, th); + + /* get attr to return */ + dt_attr_get(env, filter_object_child(fo), la, + filter_object_capa(env, fo)); +out: + filter_grant_commit(info->fti_exp, niocount, res); + dt_bufs_put(env, filter_object_child(fo), res, niocount); + filter_object_put(env, fo); + + RETURN(rc); +} + +int filter_commitrw(int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int npages, struct niobuf_local *res, + struct obd_trans_info *oti, int old_rc) +{ + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + struct filter_mod_data *fmd; + struct lu_env env; + int rc = 0; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, exp); + + LASSERT(npages > 0); + + lu_idif_build(&info->fti_fid, obj->ioo_id, obj->ioo_gr); + if (cmd == OBD_BRW_WRITE) { + /* Don't update timestamps if this write is older than a + * setattr which modifies the timestamps. b=10150 */ + + /* XXX when we start having persistent reservations this needs + * to be changed to filter_fmd_get() to create the fmd if it + * doesn't already exist so we can store the reservation handle + * there. */ + fmd = filter_fmd_find(exp, &info->fti_fid); + if (!fmd || fmd->fmd_mactime_xid < info->fti_xid) { + la_from_obdo(&info->fti_attr, oa, + OBD_MD_FLATIME | OBD_MD_FLMTIME | + OBD_MD_FLCTIME); + } else { + info->fti_attr.la_valid = 0; + } + filter_fmd_put(exp, fmd); + + rc = filter_commitrw_write(&env, ofd, &info->fti_fid, + &info->fti_attr, objcount, + npages, res, old_rc); + if (rc == 0) + obdo_from_la(oa, &info->fti_attr, + FILTER_VALID_FLAGS | LA_GID | LA_UID); + else + obdo_from_la(oa, &info->fti_attr, LA_GID | LA_UID); + if (old_rc == 0) { +#if 0 + /* update per-buffer error codes */ + if (rcs != NULL) { + memset(rcs, 0, npages * sizeof(__u32)); + /* XXX: update rcs */ + /* for (i = 0; i < npages; i++) + if (res[i].rc < 0) + rcs[res[i].rindex] = res[i].rc; + */ + } +#endif + } + } else if (cmd == OBD_BRW_READ) { + struct ldlm_namespace *ns = ofd->ofd_namespace; + + /* If oa != NULL then filter_preprw_read updated the inode + * atime and we should update the lvb so that other glimpses + * will also get the updated value. bug 5972 */ + if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) { + struct ldlm_resource *rs = NULL; + + lu_idif_resid(&info->fti_fid, &info->fti_resid); + rs = ldlm_resource_get(ns, NULL, &info->fti_resid, + LDLM_EXTENT, 0); + if (rs != NULL) { + ns->ns_lvbo->lvbo_update(rs, NULL, 0, 1); + ldlm_resource_putref(rs); + } + } + rc = filter_commitrw_read(&env, ofd, &info->fti_fid, objcount, + npages, res); + } else { + LBUG(); + rc = -EPROTO; + } + lu_env_fini(&env); + RETURN(rc); +} diff --git a/lustre/ofd/ofd_llog.c b/lustre/ofd/ofd_llog.c new file mode 100644 index 0000000..a6a71bd --- /dev/null +++ b/lustre/ofd/ofd_llog.c @@ -0,0 +1,374 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_llog.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Alex Tomas + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#if 0 +#include +#endif +#include "ofd_internal.h" + +#define OBD_LLOG_GROUP 0 + +static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/; +#if 0 +static struct llog_operations filter_size_orig_logops = { + lop_setup: llog_obd_origin_setup, + lop_cleanup: llog_obd_origin_cleanup, + lop_add: llog_obd_origin_add +}; +#endif + +int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg, + struct obd_device *tgt, int count, + struct llog_catid *catid, struct obd_uuid *uuid) +{ + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + if (olg->olg_group == OBD_LLOG_GROUP) { + LASSERT(ofd->ofd_lcm == NULL); + ofd->ofd_lcm = llog_recov_thread_init(obd->obd_name); + if (!ofd->ofd_lcm) + RETURN(-ENOMEM); + + filter_mds_ost_repl_logops = llog_client_ops; + filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel; + filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect; + filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync; + } else { + LASSERT(ofd->ofd_lcm != NULL); + } + rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL, + &filter_mds_ost_repl_logops); + if (rc) + GOTO(cleanup, rc); + + /* FIXME - assign unlink_cb for filter's recovery */ + LASSERT(olg); + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); + + LASSERT(ctxt != NULL); + ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb; + ctxt->loc_lcm = ofd->ofd_lcm; + llog_ctxt_put(ctxt); + +cleanup: + if (rc) { + llog_recov_thread_fini(ofd->ofd_lcm, 1); + ofd->ofd_lcm = NULL; + } + RETURN(rc); +} + +static int filter_group_llog_finish(struct obd_llog_group *olg) +{ + struct llog_ctxt *ctxt; + int rc = 0, rc2 = 0; + ENTRY; + + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); + if (ctxt) + rc = llog_cleanup(ctxt); + + ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; + + RETURN(rc); +} + +int filter_llog_finish(struct obd_device *obd, int count) +{ + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + int rc; + ENTRY; + + if (ofd->ofd_lcm) { + llog_recov_thread_fini(ofd->ofd_lcm, obd->obd_force); + ofd->ofd_lcm = NULL; + } + /* finish obd llog group */ + rc = filter_group_llog_finish(&obd->obd_olg); + + RETURN(rc); +} + + +struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group) +{ + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct obd_llog_group *olg, *nolg; + int rc; + + if (group == OBD_LLOG_GROUP) + RETURN(&obd->obd_olg); + + spin_lock(&ofd->ofd_llog_list_lock); + list_for_each_entry(olg, &ofd->ofd_llog_list, olg_list) { + if (olg->olg_group == group) { + spin_unlock(&ofd->ofd_llog_list_lock); + RETURN(olg); + } + } + spin_unlock(&ofd->ofd_llog_list_lock); + + OBD_ALLOC_PTR(olg); + if (olg == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + llog_group_init(olg, group); + spin_lock(&ofd->ofd_llog_list_lock); + list_for_each_entry(nolg, &ofd->ofd_llog_list, olg_list) { + if (nolg->olg_group == group) { + spin_unlock(&ofd->ofd_llog_list_lock); + OBD_FREE_PTR(olg); + RETURN(nolg); + } + } + list_add(&olg->olg_list, &ofd->ofd_llog_list); + spin_unlock(&ofd->ofd_llog_list_lock); + + rc = llog_cat_initialize(obd, olg, 1, NULL); + if (rc) { + spin_lock(&ofd->ofd_llog_list_lock); + list_del(&olg->olg_list); + spin_unlock(&ofd->ofd_llog_list_lock); + OBD_FREE_PTR(olg); + RETURN(ERR_PTR(rc)); + } + CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n", + obd->obd_name, group, olg); + + RETURN(olg); +} + +/* Callback for processing the setattr log record received from MDS by + * llog_client_api. */ +static int filter_recov_log_setattr_cb(struct llog_ctxt *ctxt, + struct llog_rec_hdr *rec, + struct llog_cookie *cookie) +{ + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct llog_setattr_rec *lsr; + struct obd_info oinfo = { { { 0 } } }; + obd_id oid; + int rc = 0; + ENTRY; + + lsr = (struct llog_setattr_rec *)rec; + OBDO_ALLOC(oinfo.oi_oa); + if (oinfo.oi_oa == NULL) + RETURN(-ENOMEM); + + oinfo.oi_oa->o_valid |= (OBD_MD_FLID | OBD_MD_FLUID | OBD_MD_FLGID | + OBD_MD_FLCOOKIE); + oinfo.oi_oa->o_id = lsr->lsr_oid; + oinfo.oi_oa->o_gr = lsr->lsr_ogen; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oinfo.oi_oa->o_uid = lsr->lsr_uid; + oinfo.oi_oa->o_gid = lsr->lsr_gid; + oinfo.oi_oa->o_lcookie = *cookie; + oid = oinfo.oi_oa->o_id; + + rc = filter_setattr(exp, &oinfo, NULL); + OBDO_FREE(oinfo.oi_oa); + + if (rc == -ENOENT) { + CDEBUG(D_HA, "object already removed, send cookie\n"); + llog_cancel(ctxt, NULL, 1, cookie, 0); + GOTO(exit, rc = 0); + } + + if (rc == 0) + CDEBUG(D_HA, "object: "LPU64" in record is chown/chgrp\n", oid); + +exit: + RETURN(rc); +} + +/* Callback for processing the unlink log record received from MDS by + * llog_client_api. */ +int filter_recov_log_unlink_cb(struct llog_ctxt *ctxt, + struct llog_rec_hdr *rec, + struct llog_cookie *cookie) +{ + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct llog_unlink_rec *lur; + struct obdo *oa; + obd_id oid; + int rc = 0; + ENTRY; + + lur = (struct llog_unlink_rec *)rec; + OBDO_ALLOC(oa); + if (oa == NULL) + RETURN(-ENOMEM); + oa->o_valid |= OBD_MD_FLCOOKIE; + oa->o_id = lur->lur_oid; + oa->o_gr = lur->lur_ogen; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oa->o_lcookie = *cookie; + oid = oa->o_id; + + rc = filter_destroy(exp, oa, NULL, NULL, NULL, NULL); + OBDO_FREE(oa); + if (rc == -ENOENT) { + CDEBUG(D_HA, "object already removed, send cookie\n"); + llog_cancel(ctxt, NULL, 1, cookie, 0); + GOTO(exit, rc = 0); + } + + if (rc == 0) + CDEBUG(D_HA, "object: "LPU64" in record is destroyed\n", oid); + +exit: + RETURN(rc); +} + +int filter_recov_log_mds_ost_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) +{ + struct llog_ctxt *ctxt = llh->lgh_ctxt; + struct llog_cookie cookie; + int rc = 0; + ENTRY; + + if (ctxt->loc_obd->obd_stopping) + RETURN(LLOG_PROC_BREAK); + + if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) { + CERROR("log is not plain\n"); + RETURN(-EINVAL); + } + + OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT, 30); + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; + cookie.lgc_index = rec->lrh_index; + + switch (rec->lrh_type) { + case MDS_UNLINK_REC: + rc = filter_recov_log_unlink_cb(ctxt, rec, &cookie); + break; + case MDS_SETATTR_REC: + rc = filter_recov_log_setattr_cb(ctxt, rec, &cookie); + break; + case LLOG_GEN_REC: { + struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec; + if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen)) + rc = 0; + else + rc = LLOG_PROC_BREAK; + CDEBUG(D_HA, "fetch generation log, send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(rc); + } + break; + default: + CERROR("log record type %08x unknown\n", rec->lrh_type); + RETURN(-EINVAL); + break; + } + + RETURN(rc); +} + +static struct obd_llog_group * +filter_find_olg_internal(struct filter_obd *filter, int group) +{ + struct obd_llog_group *olg; + + LASSERT_SPIN_LOCKED(&filter->fo_llog_list_lock); + list_for_each_entry(olg, &filter->fo_llog_list, olg_list) { + if (olg->olg_group == group) + RETURN(olg); + } + RETURN(NULL); +} + + +/** + * Find the llog_group of the filter according to the group. If it can not + * find, create the llog_group, which only happens when mds is being synced + * with OST. + */ +struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group) +{ + struct obd_llog_group *olg = NULL; + struct filter_obd *filter; + int rc; + + filter = &obd->u.filter; + + if (group == FILTER_GROUP_LLOG) + RETURN(&obd->obd_olg); + + spin_lock(&filter->fo_llog_list_lock); + olg = filter_find_olg_internal(filter, group); + if (olg) { + if (olg->olg_initializing) { + GOTO(out_unlock, olg = ERR_PTR(-EBUSY)); + } else { + GOTO(out_unlock, olg); + } + } + OBD_ALLOC_PTR(olg); + if (olg == NULL) + GOTO(out_unlock, olg = ERR_PTR(-ENOMEM)); + + llog_group_init(olg, group); + list_add(&olg->olg_list, &filter->fo_llog_list); + olg->olg_initializing = 1; + spin_unlock(&filter->fo_llog_list_lock); + + rc = llog_cat_initialize(obd, olg, 1, NULL); + if (rc) { + spin_lock(&filter->fo_llog_list_lock); + list_del(&olg->olg_list); + spin_unlock(&filter->fo_llog_list_lock); + OBD_FREE_PTR(olg); + GOTO(out, olg = ERR_PTR(-ENOMEM)); + } + spin_lock(&filter->fo_llog_list_lock); + olg->olg_initializing = 0; + spin_unlock(&filter->fo_llog_list_lock); + CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n", + obd->obd_name, group, olg); +out: + RETURN(olg); + +out_unlock: + spin_unlock(&filter->fo_llog_list_lock); + GOTO(out, olg); +} + diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c new file mode 100644 index 0000000..daa09c4 --- /dev/null +++ b/lustre/ofd/ofd_lvb.c @@ -0,0 +1,241 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_lvb.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Mike Pershin + * Author: Alex Tomas + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include + +#include "ofd_internal.h" + +/* Called with res->lr_lvb_sem held */ +static int filter_lvbo_init(struct ldlm_resource *res) +{ + struct ost_lvb *lvb = NULL; + struct filter_device *ofd; + struct filter_object *fo; + struct filter_thread_info *info; + struct lu_env env; + int rc = 0; + ENTRY; + + LASSERT(res); + + /* we only want lvb's for object resources */ + /* check for internal locks: these have name[1] != 0 */ + if (res->lr_name.name[1]) + RETURN(0); + + if (res->lr_lvb_data) + RETURN(0); + + ofd = res->lr_namespace->ns_lvbp; + LASSERT(ofd != NULL); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + OBD_ALLOC_PTR(lvb); + if (lvb == NULL) + GOTO(out, rc = -ENOMEM); + + info = filter_info_init(&env, NULL); + lu_idif_from_resid(&info->fti_fid, &res->lr_name); + + fo = filter_object_find(&env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + OBD_FREE_PTR(lvb); + GOTO(out, rc = PTR_ERR(fo)); + } + + rc = filter_attr_get(&env, fo, &info->fti_attr); + filter_object_put(&env, fo); + if (rc == 0) { + lvb->lvb_size = info->fti_attr.la_size; + lvb->lvb_blocks = info->fti_attr.la_blocks; + lvb->lvb_mtime = info->fti_attr.la_mtime; + lvb->lvb_atime = info->fti_attr.la_atime; + lvb->lvb_ctime = info->fti_attr.la_ctime; + } else { + OBD_FREE_PTR(lvb); + GOTO(out, rc); + } + + res->lr_lvb_data = lvb; + res->lr_lvb_len = sizeof(*lvb); + + CDEBUG(D_DLMTRACE, "res: "LPX64" initial lvb size: "LPX64", " + "mtime: "LPX64", blocks: "LPX64"\n", + res->lr_name.name[0], lvb->lvb_size, + lvb->lvb_mtime, lvb->lvb_blocks); + + EXIT; + +out: + lu_env_fini(&env); + + if (rc) + OST_LVB_SET_ERR(lvb->lvb_blocks, rc); + /* Don't free lvb data on lookup error */ + return rc; +} + +/* This will be called in two ways: + * + * m != NULL : called by the DLM itself after a glimpse callback + * m == NULL : called by the filter after a disk write + * + * If 'increase_only' is true, don't allow values to move backwards. + */ +static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, + int buf_idx, int increase_only) +{ + struct filter_device *ofd; + struct filter_object *fo; + struct filter_thread_info *info; + struct ost_lvb *lvb; + struct lu_env env; + int rc = 0; + ENTRY; + + LASSERT(res); + + /* we only want lvb's for object resources */ + /* check for internal locks: these have name[1] != 0 */ + if (res->lr_name.name[1]) + RETURN(0); + + mutex_down(&res->lr_lvb_sem); + lvb = res->lr_lvb_data; + if (lvb == NULL) { + CERROR("No lvb when running lvbo_update!\n"); + GOTO(out_mutex, rc = 0); + } + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + GOTO(out_mutex, rc); + + info = filter_info_init(&env, NULL); + /* Update the LVB from the network message */ + if (m != NULL) { + struct ost_lvb *new; + + new = lustre_swab_buf(m, buf_idx, sizeof(*new), + lustre_swab_ost_lvb); + if (new == NULL) { + CERROR("lustre_swab_buf failed\n"); + goto disk_update; + } + if (new->lvb_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_size, new->lvb_size); + lvb->lvb_size = new->lvb_size; + } + if (new->lvb_mtime > lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_mtime, new->lvb_mtime); + lvb->lvb_mtime = new->lvb_mtime; + } + if (new->lvb_atime > lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_atime, new->lvb_atime); + lvb->lvb_atime = new->lvb_atime; + } + if (new->lvb_ctime > lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_ctime, new->lvb_ctime); + lvb->lvb_ctime = new->lvb_ctime; + } + } + + disk_update: + /* Update the LVB from the disk inode */ + ofd = res->lr_namespace->ns_lvbp; + LASSERT(ofd != NULL); + + lu_idif_from_resid(&info->fti_fid, &res->lr_name); + + fo = filter_object_find(&env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out_env, rc = PTR_ERR(fo)); + + rc = filter_attr_get(&env, fo, &info->fti_attr); + if (rc) + GOTO(out_obj, rc); + + if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: " + LPU64" -> %llu\n", res->lr_name.name[0], + lvb->lvb_size, info->fti_attr.la_size); + lvb->lvb_size = info->fti_attr.la_size; + } + + if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_mtime, info->fti_attr.la_mtime); + lvb->lvb_mtime = info->fti_attr.la_mtime; + } + if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_atime, info->fti_attr.la_atime); + lvb->lvb_atime = info->fti_attr.la_atime; + } + if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_ctime, info->fti_attr.la_ctime); + lvb->lvb_ctime = info->fti_attr.la_ctime; + } + if (lvb->lvb_blocks != info->fti_attr.la_blocks) { + CDEBUG(D_DLMTRACE,"res: "LPU64" updating lvb blocks from disk: " + LPU64" -> %llu\n", res->lr_name.name[0], + lvb->lvb_blocks, + (unsigned long long)info->fti_attr.la_blocks); + lvb->lvb_blocks = info->fti_attr.la_blocks; + } + +out_obj: + filter_object_put(&env, fo); +out_env: + lu_env_fini(&env); +out_mutex: + mutex_up(&res->lr_lvb_sem); + return rc; +} + +struct ldlm_valblock_ops filter_lvbo = { + lvbo_init: filter_lvbo_init, + lvbo_update: filter_lvbo_update +}; diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c new file mode 100644 index 0000000..0e70444 --- /dev/null +++ b/lustre/ofd/ofd_obd.c @@ -0,0 +1,1057 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_obd.c + * + * Copyright (c) 2001-2007 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Alex Tomas + * Author: Mike Pershin + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include + +#include "ofd_internal.h" + +static inline void filter_oti2info(struct filter_thread_info *info, + struct obd_trans_info *oti) +{ + info->fti_xid = oti->oti_xid; + info->fti_transno = oti->oti_transno; +} + +static inline void filter_info2oti(struct filter_thread_info *info, + struct obd_trans_info *oti) +{ + oti->oti_xid = info->fti_xid; + oti->oti_transno = info->fti_transno; +} + +static int filter_obd_notify(struct obd_device *host, + struct obd_device *watched, + enum obd_notify_event ev, void *owner) +{ + ENTRY; + + switch (ev) { + case OBD_NOTIFY_CONFIG: + host->obd_no_conn = 0; + default: + CDEBUG(D_INFO, "Notification 0x%x\n", ev); + } + RETURN(0); +} + +static int filter_parse_connect_data(const struct lu_env *env, + struct obd_export *exp, + struct obd_connect_data *data) +{ + struct filter_device *ofd = filter_exp(exp); + struct filter_export_data *fed = &exp->exp_filter_data; + int rc = 0; + + if (!data) + RETURN(0); + + CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64 + " ocd_version: %x ocd_grant: %d ocd_index: %u\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + data->ocd_connect_flags, data->ocd_version, + data->ocd_grant, data->ocd_index); + + data->ocd_connect_flags &= OST_CONNECT_SUPPORTED; + exp->exp_connect_flags = data->ocd_connect_flags; + data->ocd_version = LUSTRE_VERSION_CODE; + +#if 0 + if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) { + CWARN("%s: OST requires FID support (flag="LPX64 + "), but client not\n", + exp->exp_obd->obd_name, + exp->exp_connect_flags); + RETURN(-EBADF); + } +#endif + + if (exp->exp_connect_flags & OBD_CONNECT_GRANT) { + obd_size left, want; + + spin_lock(&exp->exp_obd->obd_osfs_lock); + left = filter_grant_space_left(env, exp); + want = data->ocd_grant; + filter_grant(env, exp, fed->fed_grant, want, left); + data->ocd_grant = fed->fed_grant; + spin_unlock(&exp->exp_obd->obd_osfs_lock); + + CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: " + LPU64" left: "LPU64"\n", exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, exp, + data->ocd_grant, want, left); + } + + if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { + struct lr_server_data *lsd = &ofd->ofd_fsd; + int index = lsd->lsd_ost_index; + + if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { + /* this will only happen on the first connect */ + lsd->lsd_ost_index = data->ocd_index; + lsd->lsd_feature_compat |= OBD_COMPAT_OST; + filter_server_data_update(env, ofd); + } else if (index != data->ocd_index) { + LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" + " %u doesn't match actual OST index" + " %u in last_rcvd file, bad " + "configuration?\n", + obd_export_nid2str(exp), index, + data->ocd_index); + RETURN(-EBADF); + } + } + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) { + data->ocd_brw_size = 65536; + } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { + data->ocd_brw_size = min(data->ocd_brw_size, + (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + LASSERT(data->ocd_brw_size); + } + + if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) { + __u32 cksum_types = data->ocd_cksum_types; + + /* The client set in ocd_cksum_types the checksum types it + * supports. We have to mask off the algorithms that we don't + * support */ + if (cksum_types & OBD_CKSUM_ALL) + data->ocd_cksum_types &= OBD_CKSUM_ALL; + else + data->ocd_cksum_types = OBD_CKSUM_CRC32; + + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " + "%x\n", exp->exp_obd->obd_name, + obd_export_nid2str(exp), cksum_types, + data->ocd_cksum_types); + } else { + /* This client does not support OBD_CONNECT_CKSUM + * fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support " + "OBD_CONNECT_CKSUM, CRC32 will be used\n", + exp->exp_obd->obd_name, + obd_export_nid2str(exp)); + } + + /* FIXME: Do the same with the MDS UUID and fsd_peeruuid. + * FIXME: We don't strictly need the COMPAT flag for that, + * FIXME: as fsd_peeruuid[0] will tell us if that is set. + * FIXME: We needed it for the index, as index 0 is valid. */ + + RETURN(rc); +} + +static int filter_obd_reconnect(const struct lu_env *env, struct obd_export *exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) +{ + int rc; + ENTRY; + + if (exp == NULL || obd == NULL || cluuid == NULL) + RETURN(-EINVAL); + + filter_info_init(env, exp); + rc = filter_parse_connect_data(env, exp, data); + + RETURN(rc); +} + +static int filter_obd_connect(const struct lu_env *env, struct obd_export **_exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) +{ + struct lsd_client_data *lcd = NULL; + struct filter_export_data *fed; + struct filter_thread_info *info; + struct obd_export *exp; + struct filter_device *ofd; + struct lustre_handle conn = { 0 }; + int rc, group; + ENTRY; + + if (!_exp || !obd || !cluuid) + RETURN(-EINVAL); + + ofd = filter_dev(obd->obd_lu_dev); + + rc = class_connect(&conn, obd, cluuid); + if (rc) + RETURN(rc); + + exp = class_conn2export(&conn); + LASSERT(exp != NULL); + fed = &exp->exp_filter_data; + info = filter_info_init(env, exp); + + rc = filter_parse_connect_data(env, exp, data); + if (rc) + GOTO(out, rc); + + filter_export_stats_init(ofd, exp, localdata); + group = data->ocd_group; + if (obd->obd_replayable) { + OBD_ALLOC_PTR(lcd); + if (lcd == NULL) + GOTO(out, rc = -ENOMEM); + + memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid)); + fed->fed_lcd = lcd; + + rc = filter_client_new(env, ofd, &exp->exp_filter_data); + if (rc != 0) + GOTO(out, rc); + } + if (group == 0) + GOTO(out, rc = 0); + + CWARN("%s: Received MDS connection ("LPX64"); group %d\n", + obd->obd_name, exp->exp_handle.h_cookie, group); + + /* init new group */ + if (group > ofd->ofd_max_group) { + ofd->ofd_max_group = group; + filter_last_id_set(ofd, FILTER_INIT_OBJID, group); + filter_last_id_write(env, ofd, group, 1); + } + +out: + if (rc != 0) { + if (lcd) { + OBD_FREE_PTR(lcd); + fed->fed_lcd = NULL; + } + class_disconnect(exp); + } else { + *_exp = exp; + //class_export_put(exp); + } + RETURN(rc); +} + +static int filter_obd_disconnect(struct obd_export *exp) +{ + struct filter_device *ofd = filter_exp(exp); + int rc; + ENTRY; + + LASSERT(exp); + class_export_get(exp); + + if (!(exp->exp_flags & OBD_OPT_FORCE)) + filter_grant_sanity_check(filter_obd(ofd), __FUNCTION__); + filter_grant_discard(exp); + + /* Disconnect early so that clients can't keep using export */ + rc = class_disconnect(exp); + if (ofd->ofd_namespace != NULL) + ldlm_cancel_locks_for_export(exp); + + /* flush any remaining cancel messages out to the target */ + //filter_sync_llogs(obd, exp); + class_export_put(exp); + RETURN(rc); +} + +static int filter_init_export(struct obd_export *exp) +{ + spin_lock_init(&exp->exp_filter_data.fed_lock); + CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); + spin_lock(&exp->exp_lock); + exp->exp_connecting = 1; + spin_unlock(&exp->exp_lock); + + return ldlm_init_export(exp); +} + +static int filter_destroy_export(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct lu_env env; + int rc; + ENTRY; + + if (exp->exp_filter_data.fed_pending) + CERROR("%s: cli %s/%p has %lu pending on destroyed export\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp, exp->exp_filter_data.fed_pending); + + /* Not ported yet the b1_6 quota functionality + * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); + */ + + target_destroy_export(exp); + + if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid)) + RETURN(0); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + filter_info_init(&env, exp); + lprocfs_exp_cleanup(exp); + + if (obd->obd_replayable) + filter_client_free(&env, exp); + else + dt_sync(&env, ofd->ofd_osd); + + filter_grant_discard(exp); + /* FIXME Check if cleanup is required here once complete + * UOSS functionality is implemented. */ + filter_fmd_cleanup(exp); + + if (!(exp->exp_flags & OBD_OPT_FORCE)) + filter_grant_sanity_check(exp->exp_obd, __FUNCTION__); + + lu_env_fini(&env); + RETURN(0); +} + +static inline int filter_setup_llog_group(struct obd_export *exp, + struct obd_device *obd, + int group) +{ + struct obd_llog_group *olg; + struct llog_ctxt *ctxt; + int rc; + + olg = filter_find_create_olg(obd, group); + if (IS_ERR(olg)) + RETURN(PTR_ERR(olg)); + + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); + LASSERTF(ctxt != NULL, "ctxt is null\n"); + + rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); + llog_ctxt_put(ctxt); + return rc; +} + +static int filter_set_info_async(struct obd_export *exp, __u32 keylen, + void *key, __u32 vallen, void *val, + struct ptlrpc_request_set *set) +{ + struct filter_device *ofd = filter_exp(exp); + struct obd_device *obd; + int rc = 0, group; + ENTRY; + + obd = exp->exp_obd; + if (obd == NULL) { + CDEBUG(D_IOCTL, "invalid export %p\n", exp); + RETURN(-EINVAL); + } + + if (KEY_IS(KEY_CAPA_KEY)) { + rc = filter_update_capa_key(ofd, (struct lustre_capa_key *)val); + if (rc) + CERROR("filter update capability key failed: %d\n", rc); + RETURN(rc); + } + + if (!KEY_IS(KEY_MDS_CONN)) + RETURN(-EINVAL); + + LCONSOLE_WARN("%s: received MDS connection from %s\n", obd->obd_name, + obd_export_nid2str(exp)); + + /* setup llog imports */ + if (val != NULL) { + group = (int)(*(__u32 *)val); + LASSERT(group >= FILTER_GROUP_MDS0); + sema_init(&ofd->ofd_create_locks[group], 1); + spin_lock(&ofd->ofd_objid_lock); + if (group > ofd->ofd_max_group) + ofd->ofd_max_group = group; + spin_unlock(&ofd->ofd_objid_lock); + } else { + /* XXX: protocol incompatibility 1.6 vs. 1.8 */ + group = 0; + } + +#if 0 + LASSERT_MDS_GROUP(group); + rc = filter_setup_llog_group(exp, obd, group); + if (rc) + GOTO(out, rc); + + lquota_setinfo(filter_quota_interface_ref, obd, exp); + + if (group == FILTER_GROUP_MDS0) { + /* setup llog group 1 for interop */ + filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG); + } +out: +#endif + RETURN(rc); +} + +static int filter_get_info(struct obd_export *exp, __u32 keylen, void *key, + __u32 *vallen, void *val, struct lov_stripe_md *lsm) +{ + struct filter_device *ofd = filter_exp(exp); + ENTRY; + + if (exp->exp_obd == NULL) { + CDEBUG(D_IOCTL, "invalid client export %p\n", exp); + RETURN(-EINVAL); + } + + if (KEY_IS("blocksize")) { + __u32 *blocksize = val; + if (blocksize) { + if (*vallen < sizeof(*blocksize)) + RETURN(-EOVERFLOW); + *blocksize = 1 << ofd->ofd_dt_conf.ddp_block_shift; + } + *vallen = sizeof(*blocksize); + RETURN(0); + } + + if (KEY_IS("blocksize_bits")) { + __u32 *blocksize_bits = val; + if (blocksize_bits) { + if (*vallen < sizeof(*blocksize_bits)) + RETURN(-EOVERFLOW); + *blocksize_bits = ofd->ofd_dt_conf.ddp_block_shift; + } + *vallen = sizeof(*blocksize_bits); + RETURN(0); + } + + if (KEY_IS("last_id")) { + struct filter_export_data *fed = &exp->exp_filter_data; + obd_id *last_id = val; + if (last_id) { + if (*vallen < sizeof(*last_id)) + RETURN(-EOVERFLOW); + *last_id = filter_last_id(ofd, fed->fed_group); + } + *vallen = sizeof(*last_id); + RETURN(0); + } + + if (KEY_IS("FLAVOR")) { + read_lock(&ofd->ofd_sptlrpc_lock); + LBUG(); +#if 0 + sptlrpc_rule_set_choose(&ofd->ofd_sptlrpc_rset, + exp->exp_sp_peer, + exp->exp_connection->c_peer.nid, + &exp->exp_flvr); + read_unlock(&ofd->ofd_sptlrpc_lock); +#endif + RETURN(0); + } + CDEBUG(D_IOCTL, "invalid key\n"); + RETURN(-EINVAL); +} + +static int filter_statfs(struct obd_device *obd, + struct obd_statfs *osfs, __u64 max_age, __u32 flags) +{ + struct filter_device *ofd = filter_dev(obd->obd_lu_dev); + struct filter_thread_info *info; + struct lu_env env; + int rc, blockbits = ofd->ofd_dt_conf.ddp_block_shift; + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, NULL); + + /* at least try to account for cached pages. its still racey and + * might be under-reporting if clients haven't announced their + * caches with brw recently */ + rc = dt_statfs(&env, ofd->ofd_osd, &info->fti_u.ksfs);//flags? + if (rc) + GOTO(out, rc); + + statfs_pack(osfs, &info->fti_u.ksfs); + + CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64 + " pending "LPU64" free "LPU64" avail "LPU64"\n", + ofd->ofd_tot_dirty, ofd->ofd_tot_granted, ofd->ofd_tot_pending, + osfs->os_bfree << blockbits, osfs->os_bavail << blockbits); + + filter_grant_sanity_check(obd, __FUNCTION__); + osfs->os_bavail -= min(osfs->os_bavail, GRANT_FOR_LLOG + + ((ofd->ofd_tot_dirty + ofd->ofd_tot_pending + + osfs->os_bsize - 1) >> blockbits)); +#if 0 + /* set EROFS to state field if FS is mounted as RDONLY. The goal is to + * stop creating files on MDS if OST is not good shape to create + * objects.*/ + osfs->os_state = (filter->fo_obt.obt_sb->s_flags & MS_RDONLY) ? EROFS : 0; +#endif +out: + lu_env_fini(&env); + RETURN(rc); +} + +int filter_setattr(struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti) +{ + struct lu_env env; + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct filter_object *fo; + int rc = 0; + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + info = filter_info_init(&env, exp); + filter_oti2info(info, oti); + + lu_idif_build(&info->fti_fid, oinfo->oi_oa->o_id, oinfo->oi_oa->o_gr); + lu_idif_resid(&info->fti_fid, &info->fti_resid); + + rc = filter_auth_capa(ofd, &info->fti_fid, oinfo_mdsno(oinfo), + oinfo_capa(oinfo), CAPA_OPC_META_WRITE); + if (rc) + GOTO(out, rc); + + /* This would be very bad - accidentally truncating a file when + * changing the time or similar - bug 12203. */ + if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE && + oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) { + static char mdsinum[48]; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) + snprintf(mdsinum, sizeof(mdsinum) - 1, + " of inode "LPU64"/%u", oinfo->oi_oa->o_fid, + oinfo->oi_oa->o_generation); + else + mdsinum[0] = '\0'; + + CERROR("%s: setattr from %s trying to truncate objid "LPU64 + " %s\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), + oinfo->oi_oa->o_id, mdsinum); + GOTO(out, rc = -EPERM); + } + + info->fti_attr.la_valid = LA_MODE; + info->fti_attr.la_mode = S_IFREG | 0666; + + fo = filter_object_find_or_create(&env, ofd, &info->fti_fid, + &info->fti_attr); + if (IS_ERR(fo)) { + CERROR("can't find object %lu:%llu\n", + (long unsigned) info->fti_fid.f_oid, + info->fti_fid.f_seq); + GOTO(out, rc = PTR_ERR(fo)); + } + + la_from_obdo(&info->fti_attr, oinfo->oi_oa, oinfo->oi_oa->o_valid); + info->fti_attr.la_valid &= ~LA_TYPE; + + /* setting objects attributes (including owner/group) */ + rc = filter_attr_set(&env, fo, &info->fti_attr); + if (rc) { + GOTO(out_unlock, rc); + } + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + + /* Quota release needs uid/gid info */ + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + FILTER_VALID_FLAGS | LA_UID | LA_GID); + filter_info2oti(info, oti); +out_unlock: + filter_object_put(&env, fo); +out: + lu_env_fini(&env); + RETURN(rc); +} + +static int filter_punch(struct obd_export *exp, struct obd_info *oinfo, + struct obd_trans_info *oti, struct ptlrpc_request_set *rqset) +{ + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct filter_object *fo; + struct lu_env env; + int rc = 0; + + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, exp); + filter_info2oti(info, oti); + + lu_idif_build(&info->fti_fid, oinfo->oi_oa->o_id, oinfo->oi_oa->o_gr); + lu_idif_resid(&info->fti_fid, &info->fti_resid); + + CDEBUG(D_INODE, "calling punch for object "LPU64", valid = "LPX64 + ", start = "LPD64", end = "LPD64"\n", oinfo->oi_oa->o_id, + oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end); + + rc = filter_auth_capa(ofd, &info->fti_fid, oinfo_mdsno(oinfo), + oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); + if (rc) + GOTO(out_env, rc); + + info->fti_attr.la_valid = LA_MODE; + info->fti_attr.la_mode = S_IFREG | 0666; + + fo = filter_object_find_or_create(&env, ofd, &info->fti_fid, + &info->fti_attr); + if (IS_ERR(fo)) { + CERROR("can't find object %lu:%llu\n", + (unsigned long) info->fti_fid.f_oid, + info->fti_fid.f_seq); + GOTO(out_env, rc = PTR_ERR(fo)); + } + + LASSERT(oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF); + if (oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF) { + /* Truncate case */ + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; + } else if (oinfo->oi_policy.l_extent.end >= oinfo->oi_oa->o_size) { + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.end; + } + + rc = filter_object_punch(&env, fo, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end, oinfo->oi_oa); + if (rc) + GOTO(out, rc); + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + /* Quota release needs uid/gid info */ + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + FILTER_VALID_FLAGS | LA_UID | LA_GID); + filter_info2oti(info, oti); + +out: + filter_object_put(&env, fo); +out_env: + lu_env_fini(&env); + RETURN(rc); +} + +static int filter_destroy_by_fid(const struct lu_env *env, + struct filter_device *ofd, + const struct lu_fid *fid) +{ + struct filter_thread_info *info = filter_info(env); + struct lustre_handle lockh; + int flags = LDLM_AST_DISCARD_DATA, rc = 0; + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct filter_object *fo; + ENTRY; + + /* Tell the clients that the object is gone now and that they should + * throw away any cached pages. */ + lu_idif_resid(fid, &info->fti_resid); + rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, + LDLM_EXTENT, &policy, LCK_PW, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, NULL, NULL, &lockh); + + /* We only care about the side-effects, just drop the lock. */ + if (rc == ELDLM_OK) + ldlm_lock_decref(&lockh, LCK_PW); + + fo = filter_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + + if (!filter_object_exists(fo)) + rc = -ENOENT; + else + rc = filter_object_destroy(env, fo); + + filter_object_put(env, fo); + RETURN(rc); +} + +int filter_destroy(struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md *md, + struct obd_trans_info *oti, struct obd_export *md_exp, void *capa) +{ + struct lu_env env; + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + struct llog_cookie *fcc = NULL; + int rc = 0; + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, exp); + + filter_oti2info(info, oti); + + if (!(oa->o_valid & OBD_MD_FLGROUP)) + oa->o_gr = 0; + + lu_idif_build(&info->fti_fid, oa->o_id, oa->o_gr); + rc = filter_destroy_by_fid(&env, ofd, &info->fti_fid); + if (rc == -ENOENT) { + CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n", + oa->o_id); + /* If object already gone, cancel cookie right now */ + if (oa->o_valid & OBD_MD_FLCOOKIE) { + struct llog_ctxt *ctxt; + struct obd_llog_group *olg; + fcc = &oa->o_lcookie; + olg = filter_find_olg(filter_obd(ofd), oa->o_gr); + if (IS_ERR(olg)) + GOTO(out, rc = PTR_ERR(olg)); + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1); + llog_cancel(ctxt, NULL, 1, fcc, 0); + llog_ctxt_put(ctxt); + fcc = NULL; /* we didn't allocate fcc, don't free it */ + } + } else { + /* XXX: no commit callbacks from DMU yet, + * so cancel cookie immediately */ + if (oa->o_valid & OBD_MD_FLCOOKIE) { + struct llog_ctxt *ctxt; + fcc = &oa->o_lcookie; + ctxt = llog_get_context(filter_obd(ofd), + fcc->lgc_subsys + 1); + llog_cancel(ctxt, NULL, 1, fcc, 0); + fcc = NULL; /* we didn't allocate fcc, don't free it */ + } + } + + filter_info2oti(info, oti); +out: + lu_env_fini(&env); + RETURN(rc); +} + +static int filter_orphans_destroy(const struct lu_env *env, + struct filter_device *ofd, + obd_id mds_id, obd_gr gr) +{ + struct filter_thread_info *info = filter_info(env); + obd_id last, id; + int rc; + ENTRY; + + //LASSERT(mutex_try_down(&ofd->ofd_create_locks[gr]) != 0); + + last = filter_last_id(ofd, gr); + CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", + filter_obd(ofd)->obd_name, mds_id + 1, last); + + for (id = last; id > mds_id; id--) { + lu_idif_build(&info->fti_fid, id, gr); + rc = filter_destroy_by_fid(env, ofd, &info->fti_fid); + if (rc && rc != -ENOENT) /* this is pretty fatal... */ + CEMERG("error destroying precreated id "LPU64": %d\n", + id, rc); + filter_last_id_set(ofd, id - 1, gr); + /* update last_id on disk periodically so that if we restart + * we don't need to re-scan all of the just-deleted objects. */ + if ((id & 511) == 0) + filter_last_id_write(env, ofd, gr, 0); + } + CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", + filter_obd(ofd)->obd_name, gr, mds_id); + rc = filter_last_id_write(env, ofd, gr, 1); + RETURN(rc); +} + +static int filter_create(struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) +{ + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + int rc = 0, diff; + obd_gr group = oa->o_gr; + struct lu_env env; + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + info = filter_info_init(&env, exp); + filter_oti2info(info, oti); + + LASSERT(ea == NULL); + LASSERT(group >= FILTER_GROUP_MDS0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + CDEBUG(D_INFO|D_ERROR, "filter_create(oa->o_gr="LPU64",oa->o_id="LPU64")\n", + group, oa->o_id); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + if (oa->o_id > filter_last_id(ofd, group)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, filter_last_id(ofd, group)); + GOTO(out, rc = -EINVAL); + } + /* do nothing because we create objects during first write */ + GOTO(out, rc = 0); + } + /* former filter_handle_precreate */ + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_DELORPHAN)){ + /* destroy orphans */ + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old orphan cleanup request\n", + filter_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* This causes inflight precreates to abort and drop lock */ + set_bit(group, &ofd->ofd_destroys_in_progress); + mutex_down(&ofd->ofd_create_locks[group]); + if (!test_bit(group, &ofd->ofd_destroys_in_progress)) { + CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", + exp->exp_obd->obd_name, group); + GOTO(out, rc = 0); + } + diff = oa->o_id - filter_last_id(ofd, group); + CDEBUG(D_HA, "filter_last_id() = "LPU64" -> diff = %d\n", + filter_last_id(ofd, group), diff); + if (-diff > OST_MAX_PRECREATE) { + /* FIXME: should reset precreate_next_id on MDS */ + rc = 0; + } else if (diff < 0) { + rc = filter_orphans_destroy(&env, ofd, oa->o_id, group); + clear_bit(group, &ofd->ofd_destroys_in_progress); + } else { + /* XXX: Used by MDS for the first time! */ + clear_bit(group, &ofd->ofd_destroys_in_progress); + } + } else { + mutex_down(&ofd->ofd_create_locks[group]); + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old precreate request\n", + filter_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* only precreate if group == 0 and o_id is specfied */ + if (group < FILTER_GROUP_MDS0 || oa->o_id == 0) { + LBUG(); + diff = 1; /* shouldn't we create this right now? */ + } else { + diff = oa->o_id - filter_last_id(ofd, group); + } + } + if (diff > 0) { + obd_id next_id = filter_last_id(ofd, group) + diff; + + /* TODO: check we have free space. Need DMU support */ + CDEBUG(D_HA, + "%s: reserve %d objects in group "LPU64" at "LPU64"\n", + filter_obd(ofd)->obd_name, diff, group, next_id - diff); + filter_last_id_set(ofd, next_id, group); + rc = filter_last_id_write(&env, ofd, group, 0); + if (rc) + CERROR("unable to write lastobjid\n"); + else + oa->o_id = filter_last_id(ofd, group); + + LASSERT(oa->o_gr == group); + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + } + + filter_info2oti(info, oti); +out: + mutex_up(&ofd->ofd_create_locks[group]); + lu_env_fini(&env); + return rc; +} + +int filter_getattr(struct obd_export *exp, struct obd_info *oinfo) +{ + struct filter_device *ofd = filter_exp(exp); + struct filter_thread_info *info; + struct filter_object *fo; + struct lu_env env; + int rc = 0; + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + info = filter_info_init(&env, exp); + + lu_idif_build(&info->fti_fid, oinfo->oi_oa->o_id, oinfo->oi_oa->o_gr); + rc = filter_auth_capa(ofd, &info->fti_fid, oinfo_mdsno(oinfo), + oinfo_capa(oinfo), CAPA_OPC_META_READ); + if (rc) + GOTO(out, rc); + + fo = filter_object_find(&env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + LASSERT(fo != NULL); + rc = filter_attr_get(&env, fo, &info->fti_attr); + oinfo->oi_oa->o_valid = OBD_MD_FLID; + if (rc == 0) + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + FILTER_VALID_FLAGS | LA_UID | LA_GID); + + filter_object_put(&env, fo); +out: + lu_env_fini(&env); + RETURN(rc); +} + +static int filter_sync(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *ea, obd_size start, + obd_size end, void *capa) +{ + struct filter_device *ofd = filter_exp(exp); + struct lu_env env; + int rc; + + ENTRY; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + rc = dt_sync(&env, ofd->ofd_osd); + /* TODO: see filter.c in obdfilter/ + filter_sync_llogs(exp->exp_obd, exp); + */ + lu_env_fini(&env); + RETURN(rc); +} + +int filter_iocontrol(unsigned int cmd, struct obd_export *exp, int len, + void *karg, void *uarg) +{ + struct lu_env env; + struct filter_device *ofd = filter_exp(exp); + struct obd_device *obd = filter_obd(ofd); + int rc; + + ENTRY; + + CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + switch (cmd) { + case OBD_IOC_ABORT_RECOVERY: + CERROR("aborting recovery for device %s\n", obd->obd_name); + target_stop_recovery_thread(obd); + break; + case OBD_IOC_SYNC: + CDEBUG(D_RPCTRACE, "syncing ost %s\n", obd->obd_name); + rc = dt_sync(&env, ofd->ofd_osd); + break; + case OBD_IOC_SET_READONLY: + rc = dt_sync(&env, ofd->ofd_osd); + dt_ro(&env, ofd->ofd_osd); + break; + default: + CERROR("Not supported cmd = %d for device %s\n", + cmd, obd->obd_name); + rc = -EOPNOTSUPP; + } + + lu_env_fini(&env); + RETURN(rc); +} + +struct obd_ops filter_obd_ops = { + .o_owner = THIS_MODULE, + .o_notify = filter_obd_notify, + .o_connect = filter_obd_connect, + .o_reconnect = filter_obd_reconnect, + .o_disconnect = filter_obd_disconnect, + .o_set_info_async = filter_set_info_async, + .o_get_info = filter_get_info, + .o_llog_init = filter_llog_init, + .o_llog_finish = filter_llog_finish, + .o_create = filter_create, + .o_statfs = filter_statfs, + .o_setattr = filter_setattr, + .o_preprw = filter_preprw, + .o_commitrw = filter_commitrw, + .o_destroy = filter_destroy, + .o_init_export = filter_init_export, + .o_destroy_export = filter_destroy_export, + .o_init_export = filter_init_export, + .o_punch = filter_punch, + .o_getattr = filter_getattr, + .o_sync = filter_sync, + .o_iocontrol = filter_iocontrol, + +/* .o_setup = filter_setup, + .o_precleanup = filter_precleanup, + .o_cleanup = filter_cleanup, + .o_connect = filter_connect, + .o_ping = filter_ping, + .o_unpackmd = filter_unpackmd, + .o_brw = filter_brw, + + .o_llog_connect = filter_llog_connect, + .o_health_check = filter_health_check, + .o_process_config = filter_process_config,*/ +}; + + diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c new file mode 100644 index 0000000..0fb479e --- /dev/null +++ b/lustre/ofd/ofd_objects.c @@ -0,0 +1,236 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_objects.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Alex Tomas + * Author: Mike Pershin + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +struct filter_object *filter_object_find(const struct lu_env *env, + struct filter_device *ofd, + const struct lu_fid *fid) +{ + struct filter_object *fo; + struct lu_object *o; + ENTRY; + + o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL); + if (likely(!IS_ERR(o))) + fo = filter_obj(o); + else + fo = (struct filter_object *)o; /* return error */ + RETURN(fo); +} + +struct filter_object *filter_object_find_or_create(const struct lu_env *env, + struct filter_device *ofd, + const struct lu_fid *fid, + struct lu_attr *attr) +{ + struct filter_object *fo; + struct dt_object *next; + struct thandle *th; + struct dt_object_format dof; + int rc; + ENTRY; + + fo = filter_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(fo); + + LASSERT(fo != NULL); + if (filter_object_exists(fo)) + RETURN(fo); + + next = filter_object_child(fo); + LASSERT(next != NULL); + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rc = dt_declare_create(env, next, attr, NULL, NULL, th); + LASSERT(rc == 0); + + rc = filter_trans_start(env, ofd, th); + if (rc) + GOTO(trans_stop, rc); + + filter_write_lock(env, fo, 0); + if (filter_object_exists(fo)) + GOTO(unlock, rc = 0); + + CDEBUG(D_OTHER, "create new object %lu:%llu\n", + (unsigned long) fid->f_oid, fid->f_seq); + + dof.dof_type = dt_mode_to_dft(S_IFREG); + rc = dt_create(env, next, attr, NULL, &dof, th); + LASSERT(rc == 0); + LASSERT(filter_object_exists(fo)); + +unlock: + filter_write_unlock(env, fo); + +trans_stop: + filter_trans_stop(env, ofd, th); +out: + if (rc) { + filter_object_put(env, fo); + RETURN(ERR_PTR(rc)); + } + RETURN(fo); +} + +void filter_object_put(const struct lu_env *env, struct filter_object *fo) +{ + lu_object_put(env, &fo->ofo_obj.do_lu); +} + +int filter_attr_set(const struct lu_env *env, struct filter_object *fo, + const struct lu_attr *la) +{ + struct thandle *th; + struct filter_device *ofd = filter_obj2dev(fo); + struct filter_thread_info *info = filter_info(env); + struct filter_mod_data *fmd; + int rc; + ENTRY; + + if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) { + fmd = filter_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); + if (fmd && fmd->fmd_mactime_xid < info->fti_xid) + fmd->fmd_mactime_xid = info->fti_xid; + filter_fmd_put(info->fti_exp, fmd); + } + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_attr_set(env, filter_object_child(fo), la, th, + filter_object_capa(env, fo)); + LASSERT(rc == 0); + + rc = filter_trans_start(env, ofd, th); + if (rc) + RETURN(rc); + + rc = dt_attr_set(env, filter_object_child(fo), la, th); + + filter_trans_stop(env, ofd, th); + + RETURN(rc); +} + +int filter_object_punch(const struct lu_env *env, struct filter_object *fo, + __u64 start, __u64 end, struct obdo *oa) +{ + struct thandle *th; + struct filter_device *ofd = filter_obj2dev(fo); + struct filter_thread_info *info = filter_info(env); + struct filter_mod_data *fmd; + struct lu_attr attr; + int rc; + ENTRY; + + /* we support truncate, not punch yet */ + LASSERT(end == OBD_OBJECT_EOF); + + fmd = filter_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid); + if (fmd && fmd->fmd_mactime_xid < info->fti_xid) + fmd->fmd_mactime_xid = info->fti_xid; + filter_fmd_put(info->fti_exp, fmd); + + la_from_obdo(&attr, oa, OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME); + CERROR("attr=%llu\n", oa->o_valid); + attr.la_size = start; + attr.la_valid |= LA_SIZE; + + th = filter_trans_create(env, ofd); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_attr_set(env, filter_object_child(fo), &attr, th, + filter_object_capa(env, fo)); + LASSERT(rc == 0); + + rc = filter_trans_start(env, ofd, th); + if (rc) + RETURN(rc); + + rc = dt_attr_set(env, filter_object_child(fo), &attr, th); + + filter_trans_stop(env, ofd, th); + + RETURN(rc); + +} + +int filter_object_destroy(const struct lu_env *env, struct filter_object *fo) +{ + struct thandle *th; + int rc = 0; + ENTRY; + + th = filter_trans_create(env, filter_obj2dev(fo)); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + dt_declare_ref_del(env, filter_object_child(fo), th); + rc = filter_trans_start(env, filter_obj2dev(fo), th); + if (rc) + RETURN(rc); + + filter_fmd_drop(filter_info(env)->fti_exp, &fo->ofo_header.loh_fid); + + filter_write_lock(env, fo, 0); + dt_ref_del(env, filter_object_child(fo), th); + filter_write_unlock(env, fo); + + filter_trans_stop(env, filter_obj2dev(fo), th); + + RETURN(rc); +} + +int filter_attr_get(const struct lu_env *env, struct filter_object *fo, + struct lu_attr *la) +{ + int rc = 0; + + /* CROW allow object to don't exist */ + if (filter_object_exists(fo)) { + rc = dt_attr_get(env, filter_object_child(fo), la, + filter_object_capa(env, fo)); + } else { + la->la_size = 0; + la->la_blocks = 0; + la->la_atime = 0; + la->la_ctime = 0; + la->la_mtime = 0; + la->la_valid = LA_SIZE | LA_BLOCKS | + LA_ATIME | LA_CTIME | LA_MTIME; + } + + return rc; +} diff --git a/lustre/ofd/ofd_recovery.c b/lustre/ofd/ofd_recovery.c new file mode 100644 index 0000000..763178c --- /dev/null +++ b/lustre/ofd/ofd_recovery.c @@ -0,0 +1,337 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/ofd/ofd_recovery.c + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Nikita Danilov + * Author: Alex Tomas + * Author: Mike Pershin + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +struct thandle *filter_trans_create(const struct lu_env *env, + struct filter_device *ofd) +{ + struct filter_thread_info *info = filter_info(env); + struct thandle *th; + struct filter_export_data *fed; + int rc; + +#if 0 + /* export can require sync operations */ + if (info->fti_exp != NULL) + p->tp_sync = info->fti_exp->exp_need_sync; +#endif + + th = dt_trans_create(env, ofd->ofd_osd); + if (IS_ERR(th)) + return th; + + /* no last_rcvd update needed */ + if (info->fti_exp == NULL) + return th; + + /* declare last_rcvd update */ + fed = &info->fti_exp->exp_filter_data; + rc = dt_declare_record_write(env, ofd->ofd_last_rcvd, fed->fed_lr_off, + sizeof(*fed->fed_lcd), th, BYPASS_CAPA); + /* declare last_rcvd header update */ + rc = dt_declare_record_write(env, ofd->ofd_last_rcvd, 0, + sizeof(ofd->ofd_fsd), th, BYPASS_CAPA); + LASSERT(rc == 0); + + return th; +} + +int filter_trans_start(const struct lu_env *env, + struct filter_device *ofd, + struct thandle *th) +{ + int rc; + rc = ofd->ofd_osd->dd_ops->dt_trans_start(env, ofd->ofd_osd, th); + if (rc) + CERROR("Cannot start transaction, err =%d\n", rc); + return rc; +} + +void filter_trans_stop(const struct lu_env *env, + struct filter_device *ofd, + struct thandle *th) +{ + ofd->ofd_osd->dd_ops->dt_trans_stop(env, th); +} + +/* + * last_rcvd & last_committed update callbacks + */ +static int filter_last_rcvd_update(struct filter_thread_info *info, + struct thandle *th) +{ + struct filter_device *ofd = filter_exp(info->fti_exp); + struct filter_export_data *fed; + struct lsd_client_data *lcd; + __s32 rc = th->th_result; + __u64 *transno_p; + loff_t off; + int err; + ENTRY; + + LASSERT(ofd); + LASSERT(info->fti_exp); + + fed = &info->fti_exp->exp_filter_data; + LASSERT(fed); + lcd = fed->fed_lcd; + + /* if the export has already been failed, we have no last_rcvd slot */ + if (info->fti_exp->exp_failed) { + CWARN("commit transaction for disconnected client %s: rc %d\n", + info->fti_exp->exp_client_uuid.uuid, rc); + if (rc == 0) + rc = -ENOTCONN; + RETURN(rc); + } + LASSERT(lcd); + off = fed->fed_lr_off; + + mutex_down(&fed->fed_lastrcvd_lock); + + transno_p = &lcd->lcd_last_transno; + lcd->lcd_last_xid = info->fti_xid; + + /* + * When we store zero transno in mcd we can lost last transno value + * because mcd contains 0, but msd is not yet written + * The server data should be updated also if the latest + * transno is rewritten by zero. See the bug 11125 for details. + */ + if (info->fti_transno == 0 && + *transno_p == ofd->ofd_last_transno) { + spin_lock(&ofd->ofd_transno_lock); + ofd->ofd_fsd.lsd_last_transno = ofd->ofd_last_transno; + spin_unlock(&ofd->ofd_transno_lock); + filter_last_rcvd_header_write(info->fti_env, ofd, th); + } + + *transno_p = info->fti_transno; + LASSERT(fed->fed_lr_off > 0); + err = filter_last_rcvd_write(info->fti_env, ofd, lcd, &off, th); + + mutex_up(&fed->fed_lastrcvd_lock); + RETURN(err); +} + +/* add credits for last_rcvd update */ +static int filter_txn_start_cb(const struct lu_env *env, + struct thandle *handle, + void *cookie) +{ + return 0; +} + +/* Update last_rcvd records with latests transaction data */ +static int filter_txn_stop_cb(const struct lu_env *env, + struct thandle *txn, void *cookie) +{ + struct filter_device *ofd = cookie; + struct filter_txn_info *txi; + struct filter_thread_info *info = filter_info(env); + ENTRY; + + /* transno in two contexts - for commit_cb and for thread */ + txi = lu_context_key_get(&txn->th_ctx, &filter_txn_thread_key); + + if (info->fti_exp == NULL || info->fti_no_need_trans || + info->fti_exp->exp_filter_data.fed_lcd == NULL) { + txi->txi_transno = 0; + info->fti_no_need_trans = 0; + RETURN(0); + } + + LASSERT(filter_exp(info->fti_exp) == ofd); + if (info->fti_has_trans) { + /* XXX: currently there are allowed cases, but the wrong cases + * are also possible, so better check is needed here */ + CDEBUG(D_INFO, "More than one transaction "LPU64"\n", + info->fti_transno); + RETURN(0); + } + + info->fti_has_trans = 1; + spin_lock(&ofd->ofd_transno_lock); + if (txn->th_result != 0) { + if (info->fti_transno != 0) { + CERROR("Replay transno "LPU64" failed: rc %i\n", + info->fti_transno, txn->th_result); + info->fti_transno = 0; + } + } else if (info->fti_transno == 0) { + info->fti_transno = ++ ofd->ofd_last_transno; + } else { + /* should be replay */ + if (info->fti_transno > ofd->ofd_last_transno) + ofd->ofd_last_transno = info->fti_transno; + } + + /* filling reply data */ + CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n", + info->fti_transno, filter_obd(ofd)->obd_last_committed); + + /* save transno for the commit callback */ + txi->txi_transno = info->fti_transno; + spin_unlock(&ofd->ofd_transno_lock); + + return filter_last_rcvd_update(info, txn); +} + +/* commit callback, need to update last_commited value */ +static int filter_txn_commit_cb(const struct lu_env *env, + struct thandle *txn, void *cookie) +{ + struct filter_device *ofd = cookie; + struct obd_device *obd = filter_obd(ofd); + struct filter_txn_info *txi; + int i; + + txi = lu_context_key_get(&txn->th_ctx, &filter_txn_thread_key); + + /* copy of obd_transno_commit_cb() but with locking */ + spin_lock(&ofd->ofd_transno_lock); + if (txi->txi_transno > obd->obd_last_committed) { + obd->obd_last_committed = txi->txi_transno; + spin_unlock(&ofd->ofd_transno_lock); + ptlrpc_commit_replies(obd); + } else + spin_unlock(&ofd->ofd_transno_lock); + + if (txi->txi_transno) + CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", + obd->obd_name, txi->txi_transno); + + /* iterate through all additional callbacks */ + for (i = 0; i < txi->txi_cb_count; i++) { + txi->txi_cb[i].filter_cb_func(ofd, txi->txi_transno, + txi->txi_cb[i].filter_cb_data, + 0); + } + return 0; +} + +int filter_fs_setup(const struct lu_env *env, struct filter_device *ofd, + struct obd_device *obd) +{ + struct lu_fid fid; + struct filter_object *fo; + struct lu_attr attr; + int rc = 0; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP)) + RETURN (-ENOENT); + + OBD_ALLOC(ofd->ofd_last_rcvd_slots, LR_MAX_CLIENTS / 8); + if (ofd->ofd_last_rcvd_slots == NULL) + RETURN(-ENOMEM); + + /* prepare transactions callbacks */ + ofd->ofd_txn_cb.dtc_txn_start = filter_txn_start_cb; + ofd->ofd_txn_cb.dtc_txn_stop = filter_txn_stop_cb; + ofd->ofd_txn_cb.dtc_txn_commit = filter_txn_commit_cb; + ofd->ofd_txn_cb.dtc_cookie = ofd; + CFS_INIT_LIST_HEAD(&ofd->ofd_txn_cb.dtc_linkage); + + dt_txn_callback_add(ofd->ofd_osd, &ofd->ofd_txn_cb); + + lu_local_obj_fid(&fid, MDT_LAST_RECV_OID); + memset(&attr, 0, sizeof(attr)); + attr.la_valid = LA_MODE; + attr.la_mode = S_IFREG | 0666; + + fo = filter_object_find_or_create(env, ofd, &fid, &attr); + LASSERT(!IS_ERR(fo)); + ofd->ofd_last_rcvd = filter_object_child(fo); + rc = filter_server_data_init(env, ofd); + LASSERT(rc == 0); +#if 0 + o = dt_store_open(env, ofd->ofd_osd, "", LAST_RCVD, &fid); + if (!IS_ERR(o)) { + ofd->ofd_last_rcvd = o; + rc = filter_server_data_init(env, ofd); + if (rc) + GOTO(put_last_rcvd, rc); + } else { + rc = PTR_ERR(o); + CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); + RETURN(rc); + } +#endif + + lu_local_obj_fid(&fid, MDD_OBJECTS_OID); + memset(&attr, 0, sizeof(attr)); + attr.la_valid = LA_MODE; + attr.la_mode = S_IFREG | 0666; + + fo = filter_object_find_or_create(env, ofd, &fid, &attr); + LASSERT(!IS_ERR(fo)); + ofd->ofd_groups_file = filter_object_child(fo); + rc = filter_groups_init(env, ofd); + LASSERT(rc == 0); +#if 0 + o = dt_store_open(env, ofd->ofd_osd, "", FILTER_GROUPS_FILE, &fid); + if (!IS_ERR(o)) { + ofd->ofd_groups_file = o; + rc = filter_groups_init(env, ofd); + if (rc) + GOTO(stop_recov, rc); + } else { + rc = PTR_ERR(o); + CERROR("cannot open %s: rc = %d\n", FILTER_GROUPS_FILE, rc); + RETURN(rc); + } +#endif + + RETURN(0); + +//stop_recov: + target_recovery_fini(obd); +//put_last_rcvd: + OBD_FREE(ofd->ofd_last_rcvd_slots, LR_MAX_CLIENTS / 8); + lu_object_put(env, &ofd->ofd_last_rcvd->do_lu); + ofd->ofd_last_rcvd = NULL; + return rc; +} + +void filter_fs_cleanup(const struct lu_env *env, struct filter_device *ofd) +{ + ENTRY; + + LBUG(); + + /* Remove transaction callback */ + dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb); + if (ofd->ofd_last_rcvd) + lu_object_put(env, &ofd->ofd_last_rcvd->do_lu); + ofd->ofd_last_rcvd = NULL; + EXIT; +} + + -- 1.8.3.1