struct filter_client_data *fed_fcd;
loff_t fed_lr_off;
int fed_lr_idx;
+ int fed_grant_waiting;
+ __u64 fed_cached; /* both in bytes */
+ __u64 fed_grant;
+ __u64 fed_grant_sent;
};
struct obd_export {
#ifdef __KERNEL__
#include <linux/obd.h>
-#include <linux/fs.h>
typedef void (*fsfilt_cb_t)(struct obd_device *obd, __u64 last_rcvd,
void *data, int error);
cb_func, cb_data);
}
+/* very similar to obd_statfs(), but caller already holds obd_osfs_lock */
static inline int fsfilt_statfs(struct obd_device *obd, struct super_block *sb,
- struct obd_statfs *osfs)
+ unsigned long max_age)
{
- return obd->obd_fsops->fs_statfs(sb, osfs);
+ int rc = 0;
+
+ CDEBUG(D_SUPER, "osfs %lu, max_age %lu\n", obd->obd_osfs_age, max_age);
+ if (time_before(obd->obd_osfs_age, max_age)) {
+ rc = obd->obd_fsops->fs_statfs(sb, &obd->obd_osfs);
+ if (rc == 0) /* N.B. statfs can't really fail */
+ obd->obd_osfs_age = jiffies;
+ } else {
+ CDEBUG(D_SUPER, "using cached obd_statfs data\n");
+ }
+
+ return rc;
}
static inline int fsfilt_sync(struct obd_device *obd, struct super_block *sb)
typedef uint32_t obd_mode;
typedef uint32_t obd_uid;
typedef uint32_t obd_gid;
-typedef uint64_t obd_rdev;
typedef uint32_t obd_flag;
typedef uint32_t obd_count;
obd_time o_ctime;
obd_size o_size;
obd_blocks o_blocks; /* brw: clients sent cached bytes */
- obd_rdev o_rdev; /* brw: clients/servers sent grant */
+ obd_size o_grant;
obd_blksize o_blksize; /* optimal IO blocksize */
obd_mode o_mode;
obd_uid o_uid;
#define OBD_MD_FLGROUP (0x01000000) /* group */
#define OBD_MD_FLIFID (0x02000000) /* ->ost write inline fid */
#define OBD_MD_FLEPOCH (0x04000000) /* ->ost write easize is epoch */
+#define OBD_MD_FLGRANT (0x08000000) /* ost preallocation space grant */
#define OBD_MD_FLNOTOBD (~(OBD_MD_FLOBDFLG | OBD_MD_FLBLOCKS | OBD_MD_LINKNAME|\
OBD_MD_FLEASIZE | OBD_MD_FLHANDLE | OBD_MD_FLCKSUM|\
OBD_MD_FLQOS | OBD_MD_FLOSCOPQ | OBD_MD_FLCOOKIE))
struct list_head fo_export_list;
int fo_subdir_count;
- spinlock_t fo_grant_lock; /* protects tot_granted */
- obd_size fo_tot_granted;
+ obd_size fo_tot_granted; /* protected by obd_osfs_lock */
obd_size fo_tot_cached;
struct obd_import *fo_mdc_imp;
//struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */
void *cl_llcd_offset;
- struct semaphore cl_dirty_sem;
obd_size cl_dirty; /* all _dirty_ in bytes */
obd_size cl_dirty_granted; /* from ost */
obd_size cl_dirty_max; /* allowed w/o rpc */
spinlock_t obd_dev_lock;
__u64 obd_last_committed;
struct fsfilt_operations *obd_fsops;
+ spinlock_t obd_osfs_lock;
struct llog_ctxt *obd_llog_ctxt[LLOG_MAX_CTXTS];
struct obd_statfs obd_osfs;
unsigned long obd_osfs_age;
OBD_COUNTER_INCREMENT(obd, statfs);
CDEBUG(D_SUPER, "osfs %lu, max_age %lu\n", obd->obd_osfs_age, max_age);
- if (obd->obd_osfs_age == 0 || time_before(obd->obd_osfs_age, max_age)) {
+ if (time_before(obd->obd_osfs_age, max_age)) {
rc = OBP(obd, statfs)(obd, osfs, max_age);
- spin_lock(&obd->obd_dev_lock);
+ spin_lock(&obd->obd_osfs_lock);
memcpy(&obd->obd_osfs, osfs, sizeof(obd->obd_osfs));
obd->obd_osfs_age = jiffies;
- spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_osfs_lock);
} else {
CDEBUG(D_SUPER, "using cached obd_statfs data\n");
- spin_lock(&obd->obd_dev_lock);
+ spin_lock(&obd->obd_osfs_lock);
memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
- spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_osfs_lock);
}
RETURN(rc);
}
memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2,
sizeof(server_uuid)));
- init_MUTEX(&cli->cl_dirty_sem);
cli->cl_dirty = 0;
cli->cl_dirty_granted = 0;
cli->cl_dirty_max = 64*1024*1024; /* some default */
lli->lli_st_gid = src->o_gid;
if (valid & OBD_MD_FLFLAGS)
lli->lli_st_flags = src->o_flags;
- if (valid & OBD_MD_FLNLINK)
- lli->lli_st_nlink = src->o_nlink;
if (valid & OBD_MD_FLGENER)
lli->lli_st_generation = src->o_generation;
- if (valid & OBD_MD_FLRDEV)
- lli->lli_st_rdev = to_kdev_t(src->o_rdev);
}
#define S_IRWXUGO (S_IRWXU|S_IRWXG|S_IRWXO)
dst->o_flags = lli->lli_st_flags;
newvalid |= OBD_MD_FLFLAGS;
}
- if (valid & OBD_MD_FLNLINK) {
- dst->o_nlink = lli->lli_st_nlink;
- newvalid |= OBD_MD_FLNLINK;
- }
if (valid & OBD_MD_FLGENER) {
dst->o_generation = lli->lli_st_generation;
newvalid |= OBD_MD_FLGENER;
}
- if (valid & OBD_MD_FLRDEV) {
- dst->o_rdev = (__u32)kdev_t_to_nr(lli->lli_st_rdev);
- newvalid |= OBD_MD_FLRDEV;
- }
dst->o_valid |= newvalid;
}
static int fsfilt_ext3_statfs(struct super_block *sb, struct obd_statfs *osfs)
{
struct kstatfs sfs;
- int rc = vfs_statfs(sb, &sfs);
+ int rc;
+
+ memset(&sfs, 0, sizeof(sfs));
+
+ rc = sb->s_op->statfs(sb, &sfs);
if (!rc && sfs.f_bfree < sfs.f_ffree) {
sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree;
static int fsfilt_extN_statfs(struct super_block *sb, struct obd_statfs *osfs)
{
struct kstatfs sfs;
- int rc = vfs_statfs(sb, &sfs);
+ int rc;
+
+ memset(&sfs, 0, sizeof(sfs));
+
+ rc = sb->s_op->statfs(sb, &sfs);
if (!rc && sfs.f_bfree < sfs.f_ffree) {
sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree;
return 0;
}
-static int fsfilt_reiserfs_statfs(struct super_block *sb, struct obd_statfs *osfs)
+static int fsfilt_reiserfs_statfs(struct super_block *sb,
+ struct obd_statfs *osfs)
{
- struct statfs sfs;
- int rc = vfs_statfs(sb, &sfs);
+ struct kstatfs sfs;
+ int rc;
+
+ memset(&sfs, 0, sizeof(sfs));
+
+ rc = sb->s_op->statfs(sb, &sfs);
statfs_pack(osfs, &sfs);
return rc;
static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
- return fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
+ int rc;
+
+ spin_lock(&obd->obd_osfs_lock);
+ rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
+ if (rc == 0)
+ memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
+ spin_unlock(&obd->obd_osfs_lock);
+
+ return rc;
}
static int mds_statfs(struct ptlrpc_request *req)
}
/* We call this so that we can cache a bit - 1 jiffie worth */
- rc = obd_statfs(obd, lustre_msg_buf(req->rq_repmsg,0,size),jiffies-HZ);
+ rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
+ jiffies - HZ);
if (rc) {
CERROR("mds_obd_statfs failed: rc %d\n", rc);
GOTO(out, rc);
INIT_LIST_HEAD(&obd->obd_exports);
obd->obd_num_exports = 0;
spin_lock_init(&obd->obd_dev_lock);
+ spin_lock_init(&obd->obd_osfs_lock);
+ obd->obd_osfs_age = jiffies - 1000 * HZ;
init_waitqueue_head(&obd->obd_refcount_waitq);
/* XXX belongs in setup not attach */
dst->o_flags = src->i_flags;
newvalid |= OBD_MD_FLFLAGS;
}
- if (valid & OBD_MD_FLNLINK) {
- dst->o_nlink = src->i_nlink;
- newvalid |= OBD_MD_FLNLINK;
- }
if (valid & OBD_MD_FLGENER) {
dst->o_generation = src->i_generation;
newvalid |= OBD_MD_FLGENER;
}
- if (valid & OBD_MD_FLRDEV) {
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- dst->o_rdev = (__u32)kdev_t_to_nr(src->i_rdev);
-#else
- dst->o_rdev = (__u32)old_decode_dev(src->i_rdev);
-#endif
- newvalid |= OBD_MD_FLRDEV;
- }
dst->o_valid |= newvalid;
}
dst->i_gid = src->o_gid;
if (valid & OBD_MD_FLFLAGS)
dst->i_flags = src->o_flags;
- if (valid & OBD_MD_FLNLINK)
- dst->i_nlink = src->o_nlink;
if (valid & OBD_MD_FLGENER)
dst->i_generation = src->o_generation;
- if (valid & OBD_MD_FLRDEV)
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- dst->i_rdev = to_kdev_t(src->o_rdev);
-#else
- dst->i_rdev = old_decode_dev(src->o_rdev);
-#endif
}
EXPORT_SYMBOL(obdo_to_inode);
#endif
dst->o_gid = src->o_gid;
if (valid & OBD_MD_FLFLAGS)
dst->o_flags = src->o_flags;
- /*
- if (valid & OBD_MD_FLOBDFLG)
- dst->o_obdflags = src->o_obdflags;
- */
- if (valid & OBD_MD_FLNLINK)
- dst->o_nlink = src->o_nlink;
if (valid & OBD_MD_FLGENER)
dst->o_generation = src->o_generation;
- if (valid & OBD_MD_FLRDEV)
- dst->o_rdev = src->o_rdev;
if (valid & OBD_MD_FLINLINE &&
src->o_obdflags & OBD_FL_INLINEDATA) {
memcpy(dst->o_inline, src->o_inline, sizeof(src->o_inline));
RETURN(-EINVAL);
/* Temp fix to stop falling foul of osc_announce_cached() */
- oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLRDEV);
+ oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
memset(res, 0, sizeof(*res) * niocount);
/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp, int flags)
{
+ struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct filter_export_data *fed = &exp->exp_filter_data;
unsigned long irqflags;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
LASSERT(exp);
+
+ /* XXX should this go into filter_destroy_export() instead? */
+ /* forget what this client had cached, I bet this needs to be
+ * matched with appropriate client behaviour in the face of
+ * disconnects */
+ spin_lock(&exp->exp_obd->obd_osfs_lock);
+ filter->fo_tot_cached -= fed->fed_cached;
+ filter->fo_tot_granted -= fed->fed_grant;
+ fed->fed_cached = 0;
+ fed->fed_grant = 0;
+ fed->fed_grant_sent = 0;
+ fed->fed_grant_waiting = 0;
+ spin_unlock(&exp->exp_obd->obd_osfs_lock);
+
ldlm_cancel_locks_for_export(exp);
spin_lock_irqsave(&exp->exp_lock, irqflags);
exp->exp_flags = flags;
spin_unlock_irqrestore(&exp->exp_lock, irqflags);
- fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb);
+ fsfilt_sync(exp->exp_obd, filter->fo_sb);
/* XXX cleanup preallocated inodes */
/* flush any remaining cancel messages out to the target */
RETURN(rc);
}
+/* debugging to make sure that nothing bad happens, can be turned off soon */
+static void filter_grant_sanity_check(struct obd_device *obd,
+ struct filter_obd *filter, __u64 maxsize)
+{
+ struct filter_export_data *fed;
+ struct obd_export *exp_pos;
+ obd_size tot_cached = 0, tot_granted = 0;
+
+ list_for_each_entry(exp_pos, &obd->obd_exports, exp_obd_chain) {
+ fed = &exp_pos->exp_filter_data;
+ LASSERT(fed->fed_cached <= maxsize);
+ LASSERT(fed->fed_grant <= maxsize);
+ tot_cached += fed->fed_cached;
+ tot_granted += fed->fed_grant;
+ }
+ LASSERT(tot_cached == filter->fo_tot_cached);
+ LASSERT(tot_granted == filter->fo_tot_granted);
+ LASSERT(tot_cached <= maxsize);
+ LASSERT(tot_granted <= maxsize);
+}
+
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
+ struct filter_obd *filter = &obd->u.filter;
+ __u64 cached;
+ int blockbits = filter->fo_sb->s_blocksize_bits;
+ int rc;
ENTRY;
- RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
+
+ /* at least try to account for cached pages. its still racey and
+ * might be under-reporting if clients haven't announced their
+ * caches with brw recently */
+ spin_lock(&obd->obd_osfs_lock);
+ rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
+ memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
+ filter_grant_sanity_check(obd, filter, osfs->os_blocks << blockbits);
+ cached = filter->fo_tot_cached + osfs->os_bsize - 1;
+ spin_unlock(&obd->obd_osfs_lock);
+
+ cached >>= blockbits;
+ if (cached > osfs->os_bavail) {
+ CERROR("cached "LPU64" > bavail "LPU64", clamping\n", cached,
+ osfs->os_bavail);
+ cached = osfs->os_bavail;
+ }
+ osfs->os_bavail -= cached;
+ osfs->os_bfree -= cached;
+
+ RETURN(rc);
}
static int filter_get_info(struct obd_export *exp, __u32 keylen,
#define FILTER_INCOMPAT_GROUPS 0x00000001
#define FILTER_INCOMPAT_SUPP (FILTER_INCOMPAT_GROUPS)
+#define FILTER_GRANT_CHUNK (2ULL*1024*1024)
+
/* Data stored per server at the head of the last_rcvd file. In le32 order.
* Try to keep this the same as mds_server_data so we might one day merge. */
struct filter_server_data {
#include <linux/module.h>
#include <linux/pagemap.h> // XXX kill me soon
#include <linux/version.h>
+#include <asm/div64.h>
#include <linux/obd_class.h>
#include <linux/lustre_fsfilt.h>
return lnb->rc;
}
+/* See if there are unallocated parts in given file region */
+static int filter_inode_has_holes(struct inode *inode, obd_size start,
+ int len)
+{
+ int j;
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+ sector_t (*fs_bmap)(struct address_space *,
+ sector_t);
+#else
+ int (*fs_bmap)(struct address_space *, long);
+#endif
+ fs_bmap = inode->i_mapping->a_ops->bmap;
+ if (fs_bmap) {
+ for (j = 0; j <= len ; j++) {
+ if (!fs_bmap(inode->i_mapping, start+j)) {
+ return 1;
+ }
+ }
+ return 0;
+ } else {
+ /* Return -1 in case that caller cares about bmap availability.
+ */
+ return -1;
+ }
+}
+
+/* Grab the dirty and seen grant announcements from the incoming obdo.
+ * We will later calculate the clients new grant and return it. */
+static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
+{
+ struct filter_export_data *fed;
+ struct obd_device *obd = exp->exp_obd;
+ obd_size client_cached;
+ ENTRY;
+
+ if (!oa || (oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
+ (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
+ if (oa)
+ oa->o_valid &= ~OBD_MD_FLGRANT;
+ EXIT;
+ return;
+ }
+
+ client_cached = oa->o_blocks;
+ fed = &exp->exp_filter_data;
+
+ if (client_cached > fed->fed_grant)
+ CERROR("client %s claims "LPU64" granted, > "LPU64" granted\n",
+ obd->obd_name, client_cached, fed->fed_grant);
+
+ spin_lock(&obd->obd_osfs_lock);
+ /* update our accounting now so that statfs takes it into account */
+ obd->u.filter.fo_tot_cached += client_cached - fed->fed_cached;
+ fed->fed_cached = client_cached;
+
+ /* Acknowledgement that the client has seen our published grant.
+ * If the client has met our shrinking target we can reuse its
+ * difference from the previous grant. It is reasonable to announce
+ * more dirty that cached as it tries to purge its previously granted
+ * dirty data down to its newly received target. */
+ if (fed->fed_grant_waiting && (oa->o_grant <= fed->fed_grant_sent)) {
+ if (fed->fed_grant_sent < fed->fed_grant) {
+ if (client_cached <= fed->fed_grant_sent) {
+ obd->u.filter.fo_tot_granted -=
+ fed->fed_grant - oa->o_grant;
+ CDEBUG(D_SUPER, "reduced grant from "LPU64" to "
+ LPU64", total grant now "LPU64"\n",
+ fed->fed_grant, oa->o_grant,
+ obd->u.filter.fo_tot_granted);
+ fed->fed_grant = oa->o_grant;
+ fed->fed_grant_waiting = 0;
+ }
+ } else {
+ fed->fed_grant_waiting = 0;
+ }
+ }
+ spin_unlock(&obd->obd_osfs_lock);
+ oa->o_valid &= ~(OBD_MD_FLGRANT|OBD_MD_FLBLOCKS);
+ EXIT;
+}
+
+/* Figure out how much space is available between what we've granted
+ * and what remains in the filesystem. Compensate for ext3 indirect
+ * block overhead when computing how much free space is left ungranted.
+ *
+ * Caller must hold obd_osfs_lock. */
+obd_size filter_grant_space_left(struct obd_export *exp)
+{
+ obd_size left = 0;
+ struct obd_device *obd = exp->exp_obd;
+ int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
+ /* XXX I disabled statfs caching as it only creates extra problems now.
+ -- green*/
+ unsigned long max_age = jiffies/* - HZ*/+1;
+ struct filter_export_data *fed = &exp->exp_filter_data;
+ int rc;
+
+restat:
+ rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, max_age);
+ if (rc) /* N.B. statfs can't really fail, just for correctness */
+ RETURN(0);
+
+ left = obd->obd_osfs.os_bavail << blockbits;
+ left -= (left >> (blockbits - 2)) + (left >> (2 * blockbits - 2));
+ /* We cannot afford having absolutely no space, we need some for
+ llog stuff */
+ if ( left >= PAGE_SIZE * 10)
+ left -= PAGE_SIZE * 10;
+ else
+ left = 0;
+
+ /* If fed->fed_grant_waiting is set, this means
+ obd->u.filter.fo_tot_granted does not represent actual granted
+ amount and client is supposedly actively shrinks its cache, so
+ no point in printing this warning */
+ if (left < obd->u.filter.fo_tot_granted && !fed->fed_grant_waiting)
+ CERROR("granted space "LPU64" more than available "LPU64"\n",
+ obd->u.filter.fo_tot_granted, left);
+
+ left -= min(left, obd->u.filter.fo_tot_granted);
+ if (left < FILTER_GRANT_CHUNK && time_after(jiffies,obd->obd_osfs_age)){
+ CDEBUG(D_SUPER, "fs has no space left and statfs too old\n");
+ max_age = jiffies;
+ goto restat;
+ }
+
+ CDEBUG(D_SUPER, "free: "LPU64" avail: "LPU64" grant left: "LPU64"\n",
+ obd->obd_osfs.os_bfree << blockbits,
+ obd->obd_osfs.os_bavail << blockbits, left);
+
+ return left;
+}
+
+/* When clients have dirtied as much space as they've been granted they
+ * fall through to sync writes. These sync writes haven't been expressed
+ * in grants and need to error with ENOSPC when there isn't room in the
+ * filesystem for them after grants are taken into account. However,
+ * writeback of the dirty data that was already granted space can write
+ * right on through. We have no need to stop writes that won't allocate
+ * new space, so we bmap to calculate how much this io is going to consume.
+ *
+ * Caller must hold obd_osfs_lock. */
+static int filter_check_space(struct obd_export *exp, int objcount,
+ struct fsfilt_objinfo *fso, int niocount,
+ struct niobuf_remote *rnb,
+ struct niobuf_local *lnb, obd_size *left,
+ obd_size *consumed, struct inode *inode)
+{
+ int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
+ obd_size bytes, ungranted = 0;
+ int i, rc = -ENOSPC, obj, n = 0;
+
+ *consumed = 0;
+
+ for (obj = 0; obj < objcount; obj++) {
+ for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
+ obd_size tmp;
+
+ bytes = rnb[n].len;
+ tmp = rnb[n].offset & (blocksize - 1);
+ bytes += tmp;
+ tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
+ if (tmp)
+ bytes += blocksize - tmp;
+
+ if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
+ *consumed += bytes;
+ rc = 0;
+ continue;
+ }
+ if (*left - *consumed >= bytes) {
+ /* if enough space, pretend it was granted */
+ exp->exp_obd->u.filter.fo_tot_granted += bytes;
+ exp->exp_filter_data.fed_grant += bytes;
+ *consumed += bytes;
+ *left -= bytes;
+ rc = 0;
+ continue;
+ }
+ spin_unlock(&exp->exp_obd->obd_osfs_lock);
+ if (!filter_inode_has_holes(inode,
+ rnb[n].offset >>
+ inode->i_blkbits,
+ rnb[n].len >>
+ inode->i_blkbits)) {
+ rc = 0;
+ } else {
+ rc = lnb[n].rc = -ENOSPC;
+ }
+ spin_lock(&exp->exp_obd->obd_osfs_lock);
+ if (rc)
+ goto leave;
+ }
+ }
+
+ CDEBUG((*consumed != 0 && ungranted != 0) ? D_ERROR : D_SUPER,
+ "consumed: "LPU64" ungranted: "LPU64"\n", *consumed, ungranted);
+
+ if (*consumed > exp->exp_filter_data.fed_grant)
+ CERROR("request sent from cache, but not enough grant ("LPU64
+ ","LPU64")\n", *consumed,
+ exp->exp_filter_data.fed_grant);
+leave:
+ return rc;
+}
+
+/* Calculate how much grant space to allocate to this client, based on how
+ * much space is currently free and how much of that is already granted.
+ *
+ * Caller must hold obd_osfs_lock. */
+static void filter_grant(struct obd_export *exp, struct obdo *oa,
+ obd_size left, obd_size from_grant)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_export_data *fed = &exp->exp_filter_data;
+ obd_size grant, extra;
+ int blockbits;
+
+ blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
+
+ /* if things go wrong conservatively try to clamp them from
+ * generating more dirty data until things are better on our end */
+ grant = fed->fed_cached;
+
+ extra = min(FILTER_GRANT_CHUNK, left / 2);
+
+ if (grant > fed->fed_grant) {
+ /* If client has screwed up, force basic grant until fixed */
+ CERROR("client %s cached more "LPU64" than granted "LPU64"\n",
+ exp->exp_client_uuid.uuid, fed->fed_cached,
+ fed->fed_grant);
+ grant = extra;
+ } else if (fed->fed_grant_waiting) {
+ /* KISS: only one grant change in flight at a time. We
+ * could move it in the "same direction" easily,
+ * but changing directions (e.g. grow then shrink
+ * before client ACKs) would be bad. */
+ grant = fed->fed_grant_sent;
+ } else {
+ /* grant will shrink or grow as client cache/extra changes */
+ grant = fed->fed_cached + extra;
+ }
+
+ /* If we've granted all we're willing, we have to revoke
+ * the grant covering what the client just wrote. */
+ if (left == 0) {
+ grant -= min(from_grant, grant);
+ }
+
+ if (!fed->fed_grant_waiting && grant + from_grant > left ) {
+ if (from_grant < left)
+ grant = left - from_grant;
+ else
+ grant = 0;
+ }
+
+ if (grant != fed->fed_grant) {
+ fed->fed_grant_waiting = 1;
+ fed->fed_grant_sent = grant;
+ if (grant > fed->fed_grant) {
+ obd->u.filter.fo_tot_granted += grant - fed->fed_grant;
+ fed->fed_grant = grant;
+ }
+ }
+
+ CDEBUG(D_SUPER,"cli %s cache:"LPU64" grant:"LPU64", granting:"LPU64"\n",
+ exp->exp_connection->c_remote_uuid.uuid, oa->o_blocks,
+ oa->o_grant, grant);
+ CDEBUG(D_SUPER, "fed sent:"LPU64" wt:%d grant:"LPU64"\n",
+ fed->fed_grant_sent, fed->fed_grant_waiting,
+ fed->fed_grant);
+ CDEBUG(D_SUPER, "tot cached:"LPU64" granted:"LPU64" num_exports: %d\n",
+ obd->u.filter.fo_tot_cached,
+ obd->u.filter.fo_tot_granted, obd->obd_num_exports);
+
+ oa->o_valid |= OBD_MD_FLGRANT;
+ oa->o_grant = grant;
+}
+
static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
struct niobuf_local *res,
struct obd_trans_info *oti)
{
+ struct obd_device *obd = exp->exp_obd;
struct obd_run_ctxt saved;
struct obd_ioobj *o;
struct niobuf_remote *rnb;
for (i = 0, o = obj; i < objcount; i++, o++) {
LASSERT(o->ioo_bufcnt);
- dentry = filter_oa2dentry(exp->exp_obd, oa);
+ dentry = filter_oa2dentry(obd, oa);
if (IS_ERR(dentry))
GOTO(cleanup, rc = PTR_ERR(dentry));
CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
(jiffies - now));
+ if (oa) {
+ spin_lock(&obd->obd_osfs_lock);
+ filter_grant(exp, oa, filter_grant_space_left(exp), 0);
+ spin_unlock(&obd->obd_osfs_lock);
+ }
+
for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
dentry = fso[i].fso_dentry;
inode = dentry->d_inode;
CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
(jiffies - now));
- lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
- tot_bytes);
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
while (lnb-- > res) {
rc = filter_finish_page_read(lnb);
if (rc) {
struct niobuf_local *res,
struct obd_trans_info *oti)
{
+ struct obd_device *obd = exp->exp_obd;
struct obd_run_ctxt saved;
- struct niobuf_remote *rnb;
- struct niobuf_local *lnb = NULL;
+ struct niobuf_remote *rnb = nb;
+ struct niobuf_local *lnb = res;
struct fsfilt_objinfo fso;
struct dentry *dentry;
int rc = 0, i, tot_bytes = 0;
+ obd_size consumed = 0, left;
unsigned long now = jiffies;
ENTRY;
LASSERT(objcount == 1);
LASSERT(obj->ioo_bufcnt > 0);
+ filter_grant_incoming(exp, oa);
+
memset(res, 0, niocount * sizeof(*res));
- push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
- dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
- obj->ioo_id);
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
+ dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id);
if (IS_ERR(dentry))
GOTO(cleanup, rc = PTR_ERR(dentry));
CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
(jiffies - now));
+ spin_lock(&obd->obd_osfs_lock);
+ left = filter_grant_space_left(exp);
+
+ rc = filter_check_space(exp, objcount, &fso, niocount, rnb, lnb,
+ &left, &consumed, dentry->d_inode);
+ if (oa)
+ filter_grant(exp, oa, left, consumed);
+
+ spin_unlock(&obd->obd_osfs_lock);
+
+ if (rc) {
+ f_dput(dentry);
+ GOTO(cleanup, rc);
+ }
+
for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
i++, lnb++, rnb++) {
+
+ /* If there were any granting failures, we should not have
+ come here */
+ LASSERT (lnb->rc == 0);
+
lnb->dentry = dentry;
lnb->offset = rnb->offset;
lnb->len = rnb->len;
CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
(jiffies - now));
- lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
- tot_bytes);
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes);
EXIT;
cleanup:
- pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
return rc;
}
spin_lock(&cli->cl_loi_list_lock);
cli->cl_dirty_max = (obd_count)val * 1024 * 1024;
+ osc_adjust_cache(cli);
spin_unlock(&cli->cl_loi_list_lock);
return count;
extern atomic_t osc_max_rpcs_in_flight;
extern atomic_t osc_max_pages_per_rpc;
+void osc_adjust_cache(struct client_obd *cli);
#ifdef __KERNEL__
return rc;
}
-static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa)
{
- obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+ obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
- LASSERT(!(body->oa.o_valid & bits));
+ LASSERT(!(oa->o_valid & bits));
- body->oa.o_valid |= bits;
- down(&cli->cl_dirty_sem);
- body->oa.o_blocks = cli->cl_dirty;
- body->oa.o_rdev = cli->cl_dirty_granted;
- up(&cli->cl_dirty_sem);
+ oa->o_valid |= bits;
+ spin_lock(&cli->cl_loi_list_lock);
+ oa->o_blocks = cli->cl_dirty;
+ oa->o_grant = cli->cl_dirty_granted;
+ spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
cli->cl_dirty, cli->cl_dirty_granted);
}
static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
{
- if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+ if(!(body->oa.o_valid & OBD_MD_FLGRANT)) {
if (cli->cl_ost_can_grant) {
CDEBUG(D_INODE, "%s can't grant\n",
cli->cl_import->imp_target_uuid.uuid);
return;
}
- CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev);
- down(&cli->cl_dirty_sem);
- cli->cl_dirty_granted = body->oa.o_rdev;
- /* XXX check for over-run and wake up the io thread that
- * doesn't exist yet */
- up(&cli->cl_dirty_sem);
+ CDEBUG(D_SUPER, "got "LPU64" grant\n", body->oa.o_grant);
+ spin_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_dirty_granted != body->oa.o_grant) {
+ cli->cl_dirty_granted = body->oa.o_grant;
+ osc_adjust_cache(cli);
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
}
/* We assume that the reason this OSC got a short read is because it read
LASSERT((void *)(niobuf - niocount) ==
lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
- osc_announce_cached(cli, body);
+ osc_announce_cached(cli, &body->oa);
spin_lock_irqsave(&req->rq_lock, flags);
req->rq_no_resend = 1;
spin_unlock_irqrestore(&req->rq_lock, flags);
struct osc_cache_waiter {
struct list_head ocw_entry;
wait_queue_head_t ocw_waitq;
+ int ocw_rc;
};
static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
{
spin_unlock(&cli->cl_loi_list_lock);
RETURN(rc);
};
+
+static inline obd_size osc_cache_cap(struct client_obd *cli)
+{
+ if (cli->cl_ost_can_grant)
+ return min(cli->cl_dirty_granted, cli->cl_dirty_max);
+
+ return cli->cl_dirty_max;
+}
+void osc_adjust_cache(struct client_obd *cli)
+{
+ struct list_head *l, *tmp;
+ struct osc_cache_waiter *ocw;
+ obd_size cache_cap = osc_cache_cap(cli);
+
+ ENTRY;
+
+ list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ if (cli->cl_dirty + PAGE_SIZE > cache_cap &&
+ cache_cap >= PAGE_SIZE)
+ break;
+
+ ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
+ list_del_init(&ocw->ocw_entry);
+ if (cache_cap < PAGE_SIZE) {
+ /* "They" said we are starting synchronous operations, so
+ wakeup everybody waiting for pages in cache and make them
+ go away unsatisfied. */
+ ocw->ocw_rc = -EDQUOT;
+ } else {
+ cli->cl_dirty += PAGE_SIZE;
+ }
+ wake_up(&ocw->ocw_waitq);
+ }
+
+ EXIT;
+}
static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
struct osc_async_page *oap)
{
int rc = 0;
ENTRY;
- /* XXX check for ost grants here as well.. for now we ignore them. */
- if (cli->cl_dirty_max < PAGE_SIZE)
+ if (osc_cache_cap(cli) < PAGE_SIZE)
RETURN(-EDQUOT);
/* if we fail this test then cl_dirty contains at least one page
* that will have to be completed after we release the lock */
- if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) {
+ if (cli->cl_dirty + PAGE_SIZE <= osc_cache_cap(cli)) {
/* account for ourselves */
cli->cl_dirty += PAGE_SIZE;
GOTO(out, rc = 0);
}
init_waitqueue_head(&ocw.ocw_waitq);
+ ocw.ocw_rc = 0;
list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
/* make sure that there are write rpcs in flight to wait for. this
l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
spin_lock(&cli->cl_loi_list_lock);
+ rc = ocw.ocw_rc;
if (!list_empty(&ocw.ocw_entry)) {
rc = -EINTR;
list_del(&ocw.ocw_entry);
return;
}
- if (list_empty(&cli->cl_cache_waiters)) {
+ /* If nobody waits for cache space or if we need to shrink it */
+ if (list_empty(&cli->cl_cache_waiters) ||
+ (cli->cl_dirty > osc_cache_cap(cli))) {
cli->cl_dirty -= PAGE_SIZE;
} else {
ocw = list_entry(cli->cl_cache_waiters.next,
static int osc_invalidate_import(struct obd_device *obd,
struct obd_import *imp)
{
+ struct client_obd *cli;
LASSERT(imp->imp_obd == obd);
/* this used to try and tear down queued pages, but it was
* not correctly implemented. We'll have to do it again once
* we call obd_invalidate_import() agian */
- LBUG();
+ /* XXX And we still need to do this */
+
+ /* Reset grants, too */
+ cli = &obd->u.cli;
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_ost_can_grant = cli->cl_dirty_granted = 0;
+ spin_unlock(&cli->cl_loi_list_lock);
+
RETURN(0);
}
GOTO(out, rc = -EFAULT);
}
- /* BUG 974: when we send back cache grants, don't clear this flag */
- body->oa.o_valid &= ~OBD_MD_FLRDEV;
-
ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
if (ioo == NULL) {
CERROR("Missing/short ioobj\n");
rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
ioo, npages, local_nb, &oti);
- repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
- memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+ if (rc == 0) {
+ repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+ memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
#if CHECKSUM_BULK
- if (rc == 0) {
repbody->oa.o_nlink = ost_checksum_bulk(desc);
repbody->oa.o_valid |= OBD_MD_FLCKSUM;
- }
#endif
+ }
out_bulk:
ptlrpc_free_bulk(desc);
GOTO(out, rc = -EFAULT);
}
- /* BUG 974: when we send back cache grants, don't clear this flag */
- body->oa.o_valid &= ~OBD_MD_FLRDEV;
-
LASSERT_REQSWAB(req, 1);
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
if (objcount == 0) {
__swab64s (&o->o_ctime);
__swab64s (&o->o_size);
__swab64s (&o->o_blocks);
- __swab64s (&o->o_rdev);
+ __swab64s (&o->o_grant);
__swab32s (&o->o_blksize);
__swab32s (&o->o_mode);
__swab32s (&o->o_uid);
LASSERT (sizeof (((struct obdo *)0)->o_size) == 8);
LASSERT (offsetof (struct obdo, o_blocks) == 48);
LASSERT (sizeof (((struct obdo *)0)->o_blocks) == 8);
- LASSERT (offsetof (struct obdo, o_rdev) == 56);
- LASSERT (sizeof (((struct obdo *)0)->o_rdev) == 8);
+ LASSERT (offsetof (struct obdo, o_grant) == 56);
+ LASSERT (sizeof (((struct obdo *)0)->o_grant) == 8);
LASSERT (offsetof (struct obdo, o_blksize) == 64);
LASSERT (sizeof (((struct obdo *)0)->o_blksize) == 4);
LASSERT (offsetof (struct obdo, o_mode) == 68);
ptlrpc_abort_inflight(imp);
-#if 0
obd_invalidate_import(obd, imp);
-#endif
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL);
}
sh sanityN.sh
mount | grep $MOUNT && sh llmountcleanup.sh
fi
+if [ "$OOSTEST" != "no" ]; then
+ sh oos.sh
+fi
--- /dev/null
+#!/bin/bash
+
+export NAME=${NAME:-local}
+export OSTSIZE=10000
+
+MOUNT=${MOUNT:-/mnt/lustre}
+TMP=${TMP:-/tmp}
+
+echo "mnt.."
+sh llmount.sh
+echo "done"
+
+SUCCESS=1
+
+FREESPACE=`df |grep $MOUNT|tr -s ' '|cut -d ' ' -f4`
+
+rm -f $TMP/oosfile
+dd if=/dev/zero of=$MOUNT/oosfile count=$[$FREESPACE + 1] bs=1k 2>$TMP/oosfile
+
+RECORDSOUT=`grep "records out" $TMP/oosfile|cut -d + -f1`
+
+[ -z "`grep "No space left on device" $TMP/oosfile`" ] && \
+ echo "failed:dd not return ENOSPC" && SUCCESS=0
+
+REMAINEDFREE=`df |grep $MOUNT|tr -s ' '|cut -d ' ' -f4`
+[ $[$FREESPACE - $REMAINEDFREE ] -lt $RECORDSOUT ] && \
+ echo "failed:the space written by dd not equal to available space" && \
+ SUCCESS=0 && echo "$FREESPACE - $REMAINEDFREE $RECORDSOUT"
+
+[ $REMAINEDFREE -gt 100 ] && \
+ echo "failed:too many space left $REMAINEDFREE and -ENOSPC returned" &&\
+ SUCCESS=0
+
+FILESIZE=`ls -l $MOUNT/oosfile|tr -s ' '|cut -d ' ' -f5`
+[ $RECORDSOUT -ne $[$FILESIZE/1024] ] && \
+ echo "failed:the space written by dd not equal to the size of file" && \
+ SUCCESS=0
+
+[ $SUCCESS -eq 1 ] && echo "Success!"
+
+rm -f $MOUNT/oosfile*
+rm -f $TMP/oosfile
+
+echo ""
+echo "cln.."
+sh llmountcleanup.sh
}
test_45() {
f="$DIR/45"
+ # Obtain grants from OST if it supports it
+ echo blah > ${f}_grant
stop_kupdated
sync
do_dirty_record "echo blah > $f"
cmd = write ? OBD_IOC_BRW_WRITE : OBD_IOC_BRW_READ;
for (i = 1, next_count = verbose; i <= count; i++) {
- data.ioc_obdo1.o_valid &= ~(OBD_MD_FLBLOCKS|OBD_MD_FLRDEV);
+ data.ioc_obdo1.o_valid &= ~(OBD_MD_FLBLOCKS|OBD_MD_FLGRANT);
IOC_PACK(argv[0], data);
rc = l2_ioctl(OBD_DEV_ID, cmd, buf);
SHMEM_BUMP();
CHECK_MEMBER(obdo, o_ctime);
CHECK_MEMBER(obdo, o_size);
CHECK_MEMBER(obdo, o_blocks);
- CHECK_MEMBER(obdo, o_rdev);
+ CHECK_MEMBER(obdo, o_grant);
CHECK_MEMBER(obdo, o_blksize);
CHECK_MEMBER(obdo, o_mode);
CHECK_MEMBER(obdo, o_uid);
LASSERT((int)sizeof(((struct obdo *)0)->o_size) == 8);
LASSERT(offsetof(struct obdo, o_blocks) == 48);
LASSERT((int)sizeof(((struct obdo *)0)->o_blocks) == 8);
- LASSERT(offsetof(struct obdo, o_rdev) == 56);
- LASSERT((int)sizeof(((struct obdo *)0)->o_rdev) == 8);
+ LASSERT(offsetof(struct obdo, o_grant) == 56);
+ LASSERT((int)sizeof(((struct obdo *)0)->o_grant) == 8);
LASSERT(offsetof(struct obdo, o_blksize) == 64);
LASSERT((int)sizeof(((struct obdo *)0)->o_blksize) == 4);
LASSERT(offsetof(struct obdo, o_mode) == 68);