Whamcloud - gitweb
LU-1406 ofd: grant support
authorMikhail Pershin <tappro@whamcloud.com>
Tue, 22 May 2012 10:42:33 +0000 (14:42 +0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 12:37:44 +0000 (08:37 -0400)
Add grants functionality

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: I660508681e1e33b01dfbdb44ac32705575ee85e1
Reviewed-on: http://review.whamcloud.com/2871
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_export.h
lustre/ofd/Makefile.in
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_grant.c [new file with mode: 0644]
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c

index b424358..6bd4bdf 100644 (file)
@@ -114,6 +114,7 @@ struct filter_export_data {
         int                        fed_mod_count;/* items in fed_writing list */
         long                       fed_pending;  /* bytes just being written */
         __u32                      fed_group;
         int                        fed_mod_count;/* items in fed_writing list */
         long                       fed_pending;  /* bytes just being written */
         __u32                      fed_group;
+       __u8                       fed_pagesize; /* log2 of client page size */
 };
 
 struct mgs_export_data {
 };
 
 struct mgs_export_data {
index f43fda4..96e41a4 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := ofd
 
 ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o
 MODULES := ofd
 
 ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o
-ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o
+ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o
 
 EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h
 
 
 EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h
 
index 74dba8a..28f5ab4 100644 (file)
@@ -56,6 +56,90 @@ static int lprocfs_ofd_rd_groups(char *page, char **start, off_t off,
        return snprintf(page, count, "%u\n", ofd->ofd_max_group);
 }
 
        return snprintf(page, count, "%u\n", ofd->ofd_max_group);
 }
 
+static int lprocfs_ofd_rd_tot_dirty(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+
+       LASSERT(obd != NULL);
+       *eof = 1;
+       return snprintf(page, count, LPU64"\n", ofd->ofd_tot_dirty);
+}
+
+static int lprocfs_ofd_rd_tot_granted(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+
+       LASSERT(obd != NULL);
+       *eof = 1;
+       return snprintf(page, count, LPU64"\n", ofd->ofd_tot_granted);
+}
+
+static int lprocfs_ofd_rd_tot_pending(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+
+       LASSERT(obd != NULL);
+       *eof = 1;
+       return snprintf(page, count, LPU64"\n", ofd->ofd_tot_pending);
+}
+
+static int lprocfs_ofd_rd_grant_precreate(char *page, char **start, off_t off,
+                                         int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+
+       LASSERT(obd != NULL);
+       *eof = 1;
+       return snprintf(page, count, "%ld\n",
+                       obd->obd_self_export->exp_filter_data.fed_grant);
+}
+
+static int lprocfs_ofd_rd_grant_ratio(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data)
+{
+       struct obd_device *obd = (struct obd_device *)data;
+       struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+
+       LASSERT(obd != NULL);
+       *eof = 1;
+       return snprintf(page, count, "%d%%\n",
+                       (int) ofd_grant_reserved(ofd, 100));
+}
+
+static int lprocfs_ofd_wr_grant_ratio(struct file *file, const char *buffer,
+                                     unsigned long count, void *data)
+{
+       struct obd_device       *obd = (struct obd_device *)data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       int                      val;
+       int                      rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       if (val > 100 || val < 0)
+               return -EINVAL;
+
+       if (val == 0)
+               CWARN("%s: disabling grant error margin\n", obd->obd_name);
+       if (val > 50)
+               CWARN("%s: setting grant error margin >50%%, be warned that "
+                     "a huge part of the free space is now reserved for "
+                     "grants\n", obd->obd_name);
+
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+       ofd->ofd_grant_ratio = ofd_grant_ratio_conv(val);
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+       return count;
+}
+
 static int lprocfs_ofd_rd_last_id(char *page, char **start, off_t off,
                                  int count, int *eof, void *data)
 {
 static int lprocfs_ofd_rd_last_id(char *page, char **start, off_t off,
                                  int count, int *eof, void *data)
 {
@@ -303,6 +387,39 @@ int lprocfs_ofd_wr_sync_lock_cancel(struct file *file, const char *buffer,
        return count;
 }
 
        return count;
 }
 
+int lprocfs_ofd_rd_grant_compat_disable(char *page, char **start, off_t off,
+                                       int count, int *eof, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       int                      rc;
+
+       rc = snprintf(page, count, "%u\n", ofd->ofd_grant_compat_disable);
+       return rc;
+}
+
+int lprocfs_ofd_wr_grant_compat_disable(struct file *file, const char *buffer,
+                                       unsigned long count, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       int                      val;
+       int                      rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       if (val < 0)
+               return -EINVAL;
+
+       cfs_spin_lock(&ofd->ofd_flags_lock);
+       ofd->ofd_grant_compat_disable = !!val;
+       cfs_spin_unlock(&ofd->ofd_flags_lock);
+
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "uuid",                lprocfs_rd_uuid, 0, 0 },
        { "blocksize",           lprocfs_rd_blksize, 0, 0 },
 static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "uuid",                lprocfs_rd_uuid, 0, 0 },
        { "blocksize",           lprocfs_rd_blksize, 0, 0 },
@@ -314,6 +431,12 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "filegroups",          lprocfs_ofd_rd_groups, 0, 0 },
        { "fstype",              lprocfs_ofd_rd_fstype, 0, 0 },
        { "last_id",             lprocfs_ofd_rd_last_id, 0, 0 },
        { "filegroups",          lprocfs_ofd_rd_groups, 0, 0 },
        { "fstype",              lprocfs_ofd_rd_fstype, 0, 0 },
        { "last_id",             lprocfs_ofd_rd_last_id, 0, 0 },
+       { "tot_dirty",           lprocfs_ofd_rd_tot_dirty,   0, 0 },
+       { "tot_pending",         lprocfs_ofd_rd_tot_pending, 0, 0 },
+       { "tot_granted",         lprocfs_ofd_rd_tot_granted, 0, 0 },
+       { "grant_precreate",     lprocfs_ofd_rd_grant_precreate, 0, 0 },
+       { "grant_ratio",         lprocfs_ofd_rd_grant_ratio,
+                                lprocfs_ofd_wr_grant_ratio, 0, 0 },
        { "recovery_status",     lprocfs_obd_rd_recovery_status, 0, 0 },
        { "recovery_time_soft",  lprocfs_obd_rd_recovery_time_soft,
                                 lprocfs_obd_wr_recovery_time_soft, 0},
        { "recovery_status",     lprocfs_obd_rd_recovery_status, 0, 0 },
        { "recovery_time_soft",  lprocfs_obd_rd_recovery_time_soft,
                                 lprocfs_obd_wr_recovery_time_soft, 0},
@@ -331,6 +454,8 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "instance",            lprocfs_target_rd_instance, 0 },
        { "ir_factor",           lprocfs_obd_rd_ir_factor,
                                 lprocfs_obd_wr_ir_factor, 0},
        { "instance",            lprocfs_target_rd_instance, 0 },
        { "ir_factor",           lprocfs_obd_rd_ir_factor,
                                 lprocfs_obd_wr_ir_factor, 0},
+       { "grant_compat_disable", lprocfs_ofd_rd_grant_compat_disable,
+                                 lprocfs_ofd_wr_grant_compat_disable, 0 },
        { "client_cache_count",  lprocfs_ofd_rd_fmd_max_num,
                                 lprocfs_ofd_wr_fmd_max_num, 0 },
        { "client_cache_seconds", lprocfs_ofd_rd_fmd_max_age,
        { "client_cache_count",  lprocfs_ofd_rd_fmd_max_num,
                                 lprocfs_ofd_wr_fmd_max_num, 0 },
        { "client_cache_seconds", lprocfs_ofd_rd_fmd_max_age,
index ee9c226..c0e3f8f 100644 (file)
@@ -306,6 +306,13 @@ static int ofd_recovery_complete(const struct lu_env *env,
 
        ENTRY;
 
 
        ENTRY;
 
+       /* Grant space for object precreation on the self export.
+        * This initial reserved space (i.e. 20MB for zfs and 560KB for ldiskfs)
+        * is enough to create 20k objects. It is then adapted based on the
+        * precreate request size (see ofd_grant_create()
+        */
+       ofd_grant_connect(env, dev->ld_obd->obd_self_export,
+                         OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace);
        rc = next->ld_ops->ldo_recovery_complete(env, next);
        RETURN(rc);
 }
        rc = next->ld_ops->ldo_recovery_complete(env, next);
        RETURN(rc);
 }
@@ -400,6 +407,7 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        const char              *dev = lustre_cfg_string(cfg, 0);
        struct ofd_thread_info  *info = NULL;
        struct obd_device       *obd;
        const char              *dev = lustre_cfg_string(cfg, 0);
        struct ofd_thread_info  *info = NULL;
        struct obd_device       *obd;
+       struct obd_statfs       *osfs;
        int                      rc;
 
        ENTRY;
        int                      rc;
 
        ENTRY;
@@ -423,7 +431,20 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_raid_degraded = 0;
        m->ofd_syncjournal = 0;
        ofd_slc_set(m);
        m->ofd_raid_degraded = 0;
        m->ofd_syncjournal = 0;
        ofd_slc_set(m);
-
+       m->ofd_grant_compat_disable = 0;
+
+       /* statfs data */
+       cfs_spin_lock_init(&m->ofd_osfs_lock);
+       m->ofd_osfs_age = cfs_time_shift_64(-1000);
+       m->ofd_osfs_unstable = 0;
+       m->ofd_statfs_inflight = 0;
+       m->ofd_osfs_inflight = 0;
+
+       /* grant data */
+       cfs_spin_lock_init(&m->ofd_grant_lock);
+       m->ofd_tot_dirty = 0;
+       m->ofd_tot_granted = 0;
+       m->ofd_tot_pending = 0;
        m->ofd_max_group = 0;
 
        cfs_rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        m->ofd_max_group = 0;
 
        cfs_rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
@@ -473,6 +494,21 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
                GOTO(err_lu_site, rc);
        }
 
                GOTO(err_lu_site, rc);
        }
 
+       /* populate cached statfs data */
+       osfs = &ofd_info(env)->fti_u.osfs;
+       rc = ofd_statfs_internal(env, m, osfs, 0, NULL);
+       if (rc != 0) {
+               CERROR("%s: can't get statfs data, rc %d\n", obd->obd_name, rc);
+               GOTO(err_fini_stack, rc);
+       }
+       if (!IS_PO2(osfs->os_bsize)) {
+               CERROR("%s: blocksize (%d) is not a power of 2\n",
+                               obd->obd_name, osfs->os_bsize);
+               GOTO(err_fini_stack, rc = -EPROTO);
+       }
+       m->ofd_blockbits = cfs_fls(osfs->os_bsize) - 1;
+
+       snprintf(info->fti_u.name, sizeof(info->fti_u.name), "filter-%p", m);
        m->ofd_namespace = ldlm_namespace_new(obd, info->fti_u.name,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
        m->ofd_namespace = ldlm_namespace_new(obd, info->fti_u.name,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
@@ -484,6 +520,14 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
 
        dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf);
 
 
        dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf);
 
+       /* Allow at most ddp_grant_reserved% of the available filesystem space
+        * to be granted to clients, so that any errors in the grant overhead
+        * calculations do not allow granting more space to clients than can be
+        * written. Assumes that in aggregate the grant overhead calculations do
+        * not have more than ddp_grant_reserved% estimation error in them. */
+       m->ofd_grant_ratio =
+               ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
+
        rc = ofd_start(env, &m->ofd_dt_dev.dd_lu_dev);
        if (rc)
                GOTO(err_fini_stack, rc);
        rc = ofd_start(env, &m->ofd_dt_dev.dd_lu_dev);
        if (rc)
                GOTO(err_fini_stack, rc);
diff --git a/lustre/ofd/ofd_grant.c b/lustre/ofd/ofd_grant.c
new file mode 100644 (file)
index 0000000..6d85215
--- /dev/null
@@ -0,0 +1,1062 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ofd/ofd_grant.c
+ *
+ * Author: Johann Lombardi <johann@whamcloud..com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include "ofd_internal.h"
+
+#define OFD_GRANT_CHUNK (2ULL * PTLRPC_MAX_BRW_SIZE)
+#define OFD_GRANT_SHRINK_LIMIT (16ULL * OFD_GRANT_CHUNK)
+
+static inline obd_size ofd_grant_from_cli(struct obd_export *exp,
+                                         struct ofd_device *ofd, obd_size val)
+{
+       if (ofd_grant_compat(exp, ofd))
+               /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
+                * consume 4KB of grant per block, we should thus inflate
+                * the grant counters to reflect what was actually consumed */
+               return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
+       return val;
+}
+
+static inline obd_size ofd_grant_to_cli(struct obd_export *exp,
+                                       struct ofd_device *ofd, obd_size val)
+{
+       if (ofd_grant_compat(exp, ofd))
+               return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
+       return val;
+}
+
+static inline obd_size ofd_grant_chunk(struct obd_export *exp,
+                                      struct ofd_device *ofd)
+{
+       if (exp && ofd_obd(ofd)->obd_self_export == exp)
+               /* Grant enough space to handle a big precreate request */
+               return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace;
+
+       if (exp && ofd_grant_compat(exp, ofd))
+               /* Try to grant enough space to send a full-size RPC */
+               return PTLRPC_MAX_BRW_SIZE <<
+                      (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
+       return OFD_GRANT_CHUNK;
+}
+
+/**
+ * Perform extra sanity checks for grant accounting. This is done at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
+ * always get rid of it or turn it off when we know accounting is good.
+ *
+ * \param obd - is the device to check
+ * \param func - is the function to call if an inconsistency is found
+ */
+void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
+{
+       struct filter_export_data       *fed;
+       struct ofd_device               *ofd = ofd_dev(obd->obd_lu_dev);
+       struct obd_export               *exp;
+       obd_size                         maxsize;
+       obd_size                         tot_dirty = 0;
+       obd_size                         tot_pending = 0;
+       obd_size                         tot_granted = 0;
+       obd_size                         fo_tot_dirty, fo_tot_pending;
+       obd_size                         fo_tot_granted;
+
+       if (cfs_list_empty(&obd->obd_exports))
+               return;
+
+       /* We don't want to do this for large machines that do lots of
+        * mounts or unmounts.  It burns... */
+       if (obd->obd_num_exports > 100)
+               return;
+
+       maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
+
+       cfs_spin_lock(&obd->obd_dev_lock);
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+       cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+               int error = 0;
+
+               fed = &exp->exp_filter_data;
+
+               if (obd->obd_self_export == exp)
+                       CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
+                              "%ld\n", obd->obd_name, fed->fed_grant,
+                              fed->fed_pending, fed->fed_dirty);
+
+               if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
+                   fed->fed_dirty < 0)
+                       error = 1;
+               if (fed->fed_grant + fed->fed_pending > maxsize) {
+                       CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
+                              " > maxsize("LPU64")\n", obd->obd_name,
+                              exp->exp_client_uuid.uuid, exp, fed->fed_grant,
+                              fed->fed_pending, maxsize);
+                       cfs_spin_unlock(&obd->obd_dev_lock);
+                       cfs_spin_unlock(&ofd->ofd_grant_lock);
+                       LBUG();
+               }
+               if (fed->fed_dirty > maxsize) {
+                       CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
+                              ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
+                              exp, fed->fed_dirty, maxsize);
+                       cfs_spin_unlock(&obd->obd_dev_lock);
+                       cfs_spin_unlock(&ofd->ofd_grant_lock);
+                       LBUG();
+               }
+               CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
+                            "%ld pend %ld grant %ld\n", obd->obd_name,
+                            exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
+                            fed->fed_pending, fed->fed_grant);
+               tot_granted += fed->fed_grant + fed->fed_pending;
+               tot_pending += fed->fed_pending;
+               tot_dirty += fed->fed_dirty;
+       }
+       cfs_spin_unlock(&obd->obd_dev_lock);
+       fo_tot_granted = ofd->ofd_tot_granted;
+       fo_tot_pending = ofd->ofd_tot_pending;
+       fo_tot_dirty = ofd->ofd_tot_dirty;
+
+       if (tot_granted != fo_tot_granted)
+               CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
+                      func, tot_granted, fo_tot_granted);
+       if (tot_pending != fo_tot_pending)
+               CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
+                      func, tot_pending, fo_tot_pending);
+       if (tot_dirty != fo_tot_dirty)
+               CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
+                      func, tot_dirty, fo_tot_dirty);
+       if (tot_pending > tot_granted)
+               CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
+                      func, tot_pending, tot_granted);
+       if (tot_granted > maxsize)
+               CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
+                      func, tot_granted, maxsize);
+       if (tot_dirty > maxsize)
+               CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
+                      func, tot_dirty, maxsize);
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+}
+
+/**
+ * Get fresh statfs information from the OSD layer if the cache is older than 1s
+ * or if force is set. The OSD layer is in charge of estimating data & metadata
+ * overhead.
+ *
+ * \param env - is the lu environment passed by the caller
+ * \param exp - export used to print client info in debug messages
+ * \param force - is used to force a refresh of statfs information
+ * \param from_cache - returns whether the statfs information are
+ *                  taken from cache
+ */
+static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
+                            int force, int *from_cache)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct ofd_device       *ofd = ofd_exp(exp);
+       struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
+       __u64                    max_age;
+       int                      rc;
+
+       if (force)
+               max_age = 0; /* get fresh statfs data */
+       else
+               max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
+
+       rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
+       if (unlikely(rc)) {
+               *from_cache = 0;
+               return;
+       }
+
+       CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
+              obd->obd_name, exp->exp_client_uuid.uuid, exp,
+              osfs->os_bfree << ofd->ofd_blockbits,
+              osfs->os_bavail << ofd->ofd_blockbits);
+}
+
+/**
+ * Figure out how much space is available on the backend filesystem.
+ * This is done by accessing cached statfs data previously populated by
+ * ofd_grant_statfs(), from which we withdraw the space already granted to
+ * clients and the reserved space.
+ *
+ * \param exp - export which received the write request
+ */
+static obd_size ofd_grant_space_left(struct obd_export *exp)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct ofd_device       *ofd = ofd_exp(exp);
+       obd_size                 tot_granted;
+       obd_size                 left, avail;
+       obd_size                 unstable;
+
+       ENTRY;
+       LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
+
+       cfs_spin_lock(&ofd->ofd_osfs_lock);
+       /* get available space from cached statfs data */
+       left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
+       unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
+       cfs_spin_unlock(&ofd->ofd_osfs_lock);
+
+       tot_granted = ofd->ofd_tot_granted;
+
+       if (left < tot_granted) {
+               int mask = (left + unstable <
+                           tot_granted - ofd->ofd_tot_pending) ?
+                           D_ERROR : D_CACHE;
+
+               CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
+                            LPU64" unstable "LPU64" pending "LPU64"\n",
+                            obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                            left, tot_granted, unstable,
+                            ofd->ofd_tot_pending);
+               RETURN(0);
+       }
+
+       avail = left;
+       /* Withdraw space already granted to clients */
+       left -= tot_granted;
+
+       /* If the left space is below the grant threshold x available space,
+        * stop granting space to clients.
+        * The purpose of this threshold is to keep some error margin on the
+        * overhead estimate made by the OSD layer. If we grant all the free
+        * space, we have no way (grant space cannot be revoked yet) to
+        * adjust if the write overhead has been underestimated. */
+       left -= min_t(obd_size, left, ofd_grant_reserved(ofd, avail));
+
+       /* Align left on block size */
+       left &= ~((1ULL << ofd->ofd_blockbits) - 1);
+
+       CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
+              LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
+              exp->exp_client_uuid.uuid, exp, avail, left, unstable,
+              tot_granted, ofd->ofd_tot_pending);
+
+       RETURN(left);
+}
+
+/**
+ * Grab the dirty and seen grant announcements from the incoming obdo.
+ * We will later calculate the client's new grant and return it.
+ * Caller must hold ofd_grant_lock spinlock.
+ *
+ * \param env - is the lu environment supplying osfs storage
+ * \param exp - is the export for which we received the request
+ * \paral oa - is the incoming obdo sent by the client
+ */
+static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
+                              struct obdo *oa)
+{
+       struct filter_export_data       *fed;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct obd_device               *obd = exp->exp_obd;
+       long                             dirty, dropped, grant_chunk;
+       ENTRY;
+
+       LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
+
+       if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
+                                       (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
+               oa->o_valid &= ~OBD_MD_FLGRANT;
+               RETURN_EXIT;
+       }
+
+       fed = &exp->exp_filter_data;
+
+       /* Add some margin, since there is a small race if other RPCs arrive
+        * out-or-order and have already consumed some grant.  We want to
+        * leave this here in case there is a large error in accounting. */
+       CDEBUG(D_CACHE,
+              "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
+              obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
+              oa->o_dropped, fed->fed_grant);
+
+       if ((long long)oa->o_dirty < 0)
+               oa->o_dirty = 0;
+
+       dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
+       dropped     = ofd_grant_from_cli(exp, ofd, (obd_size)oa->o_dropped);
+       grant_chunk = ofd_grant_chunk(exp, ofd);
+
+       /* Update our accounting now so that statfs takes it into account.
+        * Note that fed_dirty is only approximate and can become incorrect
+        * if RPCs arrive out-of-order.  No important calculations depend
+        * on fed_dirty however, but we must check sanity to not assert. */
+       if (dirty > fed->fed_grant + 4 * grant_chunk)
+               dirty = fed->fed_grant + 4 * grant_chunk;
+       ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
+       if (fed->fed_grant < dropped) {
+               CDEBUG(D_CACHE,
+                      "%s: cli %s/%p reports %lu dropped > grant %lu\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
+                      fed->fed_grant);
+               dropped = 0;
+       }
+       if (ofd->ofd_tot_granted < dropped) {
+               CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
+                      "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      dropped, ofd->ofd_tot_granted);
+               dropped = 0;
+       }
+       ofd->ofd_tot_granted -= dropped;
+       fed->fed_grant -= dropped;
+       fed->fed_dirty = dirty;
+
+       if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
+               CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+       EXIT;
+}
+
+/**
+ * Called when the client is able to release some grants. Proceed with the
+ * shrink request when there is less ungranted space remaining
+ * than the amount all of the connected clients would consume if they
+ * used their full grant.
+ *
+ * \param exp - is the export for which we received the request
+ * \paral oa - is the incoming obdo sent by the client
+ * \param left_space - is the remaining free space with space already granted
+ *                  taken out
+ */
+static void ofd_grant_shrink(struct obd_export *exp,
+                            struct obdo *oa, obd_size left_space)
+{
+       struct filter_export_data       *fed;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct obd_device               *obd = exp->exp_obd;
+       long                             grant_shrink;
+
+       LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
+
+       if (left_space >= ofd->ofd_tot_granted_clients *
+                         OFD_GRANT_SHRINK_LIMIT)
+               return;
+
+       grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
+
+       fed = &exp->exp_filter_data;
+       fed->fed_grant       -= grant_shrink;
+       ofd->ofd_tot_granted -= grant_shrink;
+
+       CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
+              LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
+              exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
+
+       /* client has just released some grant, don't grant any space back */
+       oa->o_grant = 0;
+}
+
+/**
+ * Calculate how much space is required to write a given network buffer
+ */
+static inline int ofd_grant_rnb_size(struct obd_export *exp,
+                                    struct ofd_device *ofd,
+                                    struct niobuf_remote *rnb)
+{
+       obd_size blocksize, bytes, end;
+
+       if (exp && ofd_grant_compat(exp, ofd))
+               blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
+       else
+               blocksize = 1ULL << ofd->ofd_blockbits;
+
+       /* The network buffer might span several blocks, align it on block
+        * boundaries */
+       bytes  = rnb->rnb_offset & (blocksize - 1);
+       bytes += rnb->rnb_len;
+       end    = bytes & (blocksize - 1);
+       if (end)
+               bytes += blocksize - end;
+       if (exp)
+               /* Apply per-export pecularities if one is given */
+               bytes = ofd_grant_from_cli(exp, ofd, (obd_size)bytes);
+       return bytes;
+}
+
+
+/**
+ * When clients have dirtied as much space as they've been granted they
+ * fall through to sync writes.  These sync writes haven't been expressed
+ * in grants and need to error with ENOSPC when there isn't room in the
+ * filesystem for them after grants are taken into account.  However,
+ * writeback of the dirty data that was already granted space can write
+ * right on through.
+ * Caller must hold ofd_grant_lock spinlock.
+ *
+ * \param env - is the lu environment passed by the caller
+ * \param exp - is the export identifying the client which sent the RPC
+ * \param oa  - is the incoming obdo in which we should return the pack the
+ *           additional grant
+ * \param rnb - is the list of network buffers
+ * \param niocont - is the number of network buffers in the list
+ * \param left - is the remaining free space with space already granted
+ *            taken out
+ */
+static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
+                           struct obdo *oa, struct niobuf_remote *rnb,
+                           int niocount, obd_size *left)
+{
+       struct filter_export_data       *fed = &exp->exp_filter_data;
+       struct obd_device               *obd = exp->exp_obd;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       unsigned long                    ungranted = 0;
+       unsigned long                    granted = 0;
+       int                              i;
+       int                              resend = 0;
+       struct ofd_thread_info          *info = ofd_info(env);
+
+       ENTRY;
+
+       LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
+
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           (oa->o_flags & OBD_FL_RECOV_RESEND)) {
+               resend = 1;
+               CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
+                               "accounting\n");
+       }
+
+       for (i = 0; i < niocount; i++) {
+               int bytes;
+
+               if (obd->obd_recovering) {
+                       /* Replaying write. Grant info have been processed
+                        * already so no need to do any enforcement here.
+                        * It is worth noting that only bulk writes with all
+                        * rnbs having OBD_BRW_FROM_GRANT can be replayed.
+                        * If one page hasn't OBD_BRW_FROM_GRANT set, then
+                        * the whole bulk is written synchronously */
+                       if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
+                                rnb[i].rnb_flags |= OBD_BRW_GRANTED;
+                                continue;
+                       } else {
+                               CERROR("%s: cli %s is replaying OST_WRITE "
+                                      "while one rnb hasn't OBD_BRW_FROM_GRANT"
+                                      " set (0x%x)\n", exp->exp_obd->obd_name,
+                                       exp->exp_client_uuid.uuid,
+                                       rnb[i].rnb_flags);
+
+                       }
+               } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
+                          (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
+                       if (resend) {
+                               /* This is a recoverable resend so grant
+                                * information have already been processed */
+                               rnb[i].rnb_flags |= OBD_BRW_GRANTED;
+                               continue;
+                       }
+
+                       /* inflate consumed space if needed */
+                       bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
+                       if (fed->fed_grant < granted + bytes) {
+                               CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
+                                      "GRANT, real grant %lu idx %d\n",
+                                      exp->exp_obd->obd_name,
+                                      exp->exp_client_uuid.uuid, exp,
+                                      granted, bytes, fed->fed_grant, i);
+                       } else {
+                               granted += bytes;
+                               rnb[i].rnb_flags |= OBD_BRW_GRANTED;
+                               continue;
+                       }
+               }
+
+               /* Consume grant space on the server.
+                * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
+                * so that the required grant space isn't inflated. This is
+                * done on purpose since the server can deal with large block
+                * size, unlike some clients */
+               bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
+               if (*left > ungranted + bytes) {
+                       /* if enough space, pretend it was granted */
+                       ungranted += bytes;
+                       rnb[i].rnb_flags |= OBD_BRW_GRANTED;
+                       continue;
+               }
+
+               /* We can't check for already-mapped blocks here (make sense
+                * when backend filesystem does not use COW) as it requires
+                * dropping the grant lock.
+                * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
+                * to go through and verify if all of the blocks not marked
+                *  BRW_GRANTED are already mapped and we can ignore this error.
+                */
+               rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
+               CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
+                               exp->exp_obd->obd_name,
+                               exp->exp_client_uuid.uuid, exp, i, bytes);
+       }
+
+       /* record space used for the I/O, will be used in ofd_grant_commmit() */
+       /* Now substract what the clients has used already.  We don't subtract
+        * this from the tot_granted yet, so that other client's can't grab
+        * that space before we have actually allocated our blocks. That
+        * happens in ofd_grant_commit() after the writes are done. */
+       info->fti_used = granted + ungranted;
+       *left -= ungranted;
+       fed->fed_grant -= granted;
+       fed->fed_pending += info->fti_used;
+       ofd->ofd_tot_granted += ungranted;
+       ofd->ofd_tot_pending += info->fti_used;
+
+       CDEBUG(D_CACHE,
+              "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
+              "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
+              granted, ungranted, fed->fed_grant, fed->fed_dirty);
+
+       if (obd->obd_recovering)
+               /* don't update dirty accounting during recovery */
+               RETURN_EXIT;
+
+       if (fed->fed_dirty < granted) {
+               CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      granted, fed->fed_dirty);
+               granted = fed->fed_dirty;
+       }
+       ofd->ofd_tot_dirty -= granted;
+       fed->fed_dirty -= granted;
+
+       if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
+               CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+       EXIT;
+}
+
+/**
+ * Calculate how much grant space to return to client, based on how much space
+ * is currently free and how much of that is already granted.
+ * Caller must hold ofd_grant_lock spinlock.
+ *
+ * \param exp - is the export of the client which sent the request
+ * \param curgrant - is the current grant claimed by the client
+ * \param want - is how much grant space the client would like to have
+ * \param left - is the remaining free space with granted space taken out
+ */
+static long ofd_grant(struct obd_export *exp, obd_size curgrant,
+                     obd_size want, obd_size left)
+{
+       struct obd_device               *obd = exp->exp_obd;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct filter_export_data       *fed = &exp->exp_filter_data;
+       long                             grant_chunk;
+       obd_size                         grant;
+
+       ENTRY;
+
+       if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
+               RETURN(0);
+
+       if (want > 0x7fffffff) {
+               CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
+               RETURN(0);
+       }
+
+       /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
+        * size while the reality is different */
+       curgrant    = ofd_grant_from_cli(exp, ofd, curgrant);
+       want    = ofd_grant_from_cli(exp, ofd, want);
+       grant_chunk = ofd_grant_chunk(exp, ofd);
+
+       /* Grant some fraction of the client's requested grant space so that
+        * they are not always waiting for write credits (not all of it to
+        * avoid overgranting in face of multiple RPCs in flight).  This
+        * essentially will be able to control the OSC_MAX_RIF for a client.
+        *
+        * If we do have a large disparity between what the client thinks it
+        * has and what we think it has, don't grant very much and let the
+        * client consume its grant first.  Either it just has lots of RPCs
+        * in flight, or it was evicted and its grants will soon be used up. */
+       if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
+                  RETURN(0);
+
+       if (!obd->obd_recovering)
+               /* don't grant more than 1/8th of the remaining free space in
+                * one chunk */
+               left >>= 3;
+       grant = min(want, left);
+       /* align grant on block size */
+       grant &= ~((1ULL << ofd->ofd_blockbits) - 1);
+
+       if (!grant)
+               RETURN(0);
+
+       /* Allow >OFD_GRANT_CHUNK size when clients reconnect due to a
+        * server reboot. */
+       if ((grant > grant_chunk) && (!obd->obd_recovering))
+               grant = grant_chunk;
+
+       ofd->ofd_tot_granted += grant;
+       fed->fed_grant += grant;
+
+       if (fed->fed_grant < 0) {
+               CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
+                      obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      fed->fed_grant, want, curgrant);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+
+       CDEBUG(D_CACHE,
+              "%s: cli %s/%p wants: "LPU64" current grant "LPU64
+              " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
+              exp, want, curgrant, grant);
+       CDEBUG(D_CACHE,
+              "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
+              " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
+              exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
+              obd->obd_num_exports);
+
+       RETURN(ofd_grant_to_cli(exp, ofd, grant));
+}
+
+/**
+ * Client connection or reconnection.
+ *
+ * \param env - is the lu environment provided by the caller
+ * \param exp - is the client's export which is reconnecting
+ * \param want - is how much the client would like to get
+ */
+long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
+                      obd_size want)
+{
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct filter_export_data       *fed = &exp->exp_filter_data;
+       obd_size                         left = 0;
+       long                             grant;
+       int                              from_cache;
+       int                              force = 0; /* can use cached data */
+
+       /* don't grant space to client with read-only access */
+       if ((exp->exp_connect_flags & OBD_CONNECT_RDONLY) ||
+           ofd_grant_prohibit(exp, ofd))
+               return 0;
+
+refresh:
+       ofd_grant_statfs(env, exp, force, &from_cache);
+
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+
+       /* Grab free space from cached info and take out space already granted
+        * to clients as well as reserved space */
+       left = ofd_grant_space_left(exp);
+
+       /* get fresh statfs data if we are short in ungranted space */
+       if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
+               force = 1;
+               goto refresh;
+       }
+
+       ofd_grant(exp, ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant),
+                 want, left);
+
+       /* return to client its current grant */
+       grant = ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant);
+       ofd->ofd_tot_granted_clients++;
+
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+
+       CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
+              LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+              exp, grant, want, left);
+
+       return grant;
+}
+
+/**
+ * Remove a client from the grant accounting totals.  We also remove
+ * the export from the obd device under the osfs and dev locks to ensure
+ * that the ofd_grant_sanity_check() calculations are always valid.
+ * The client should do something similar when it invalidates its import.
+ *
+ * \param exp - is the client's export to remove from grant accounting
+ */
+void ofd_grant_discard(struct obd_export *exp)
+{
+       struct obd_device               *obd = exp->exp_obd;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct filter_export_data       *fed = &exp->exp_filter_data;
+
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+       LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
+                "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
+                obd->obd_name, ofd->ofd_tot_granted,
+                exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+       ofd->ofd_tot_granted -= fed->fed_grant;
+       fed->fed_grant = 0;
+       LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
+                "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
+                obd->obd_name, ofd->ofd_tot_pending,
+                exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+       /* ofd_tot_pending is handled in ofd_grant_commit as bulk
+        * finishes */
+       LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
+                "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
+                obd->obd_name, ofd->ofd_tot_dirty,
+                exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+       ofd->ofd_tot_dirty -= fed->fed_dirty;
+       fed->fed_dirty = 0;
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+}
+
+/**
+ * Called at prepare time when handling read request. This function extracts
+ * incoming grant information from the obdo and processes the grant shrink
+ * request, if any.
+ *
+ * \param env - is the lu environment provided by the caller
+ * \param exp - is the export of the client which sent the request
+ * \paral oa - is the incoming obdo sent by the client
+ */
+void ofd_grant_prepare_read(const struct lu_env *env,
+                           struct obd_export *exp, struct obdo *oa)
+{
+       struct ofd_device       *ofd = ofd_exp(exp);
+       int                      do_shrink;
+       obd_size                 left = 0;
+
+       if (!oa)
+               return;
+
+       if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
+               /* The read request does not contain any grant
+                * information */
+               return;
+
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
+               /* To process grant shrink request, we need to know how much
+                * available space remains on the backend filesystem.
+                * Shrink requests are not so common, we always get fresh
+                * statfs information. */
+               ofd_grant_statfs(env, exp, 1, NULL);
+
+               /* protect all grant counters */
+               cfs_spin_lock(&ofd->ofd_grant_lock);
+
+               /* Grab free space from cached statfs data and take out space
+                * already granted to clients as well as reserved space */
+               left = ofd_grant_space_left(exp);
+
+               /* all set now to proceed with shrinking */
+               do_shrink = 1;
+       } else {
+               /* no grant shrinking request packed in the obdo and
+                * since we don't grant space back on reads, no point
+                * in running statfs, so just skip it and process
+                * incoming grant data directly. */
+               cfs_spin_lock(&ofd->ofd_grant_lock);
+               do_shrink = 0;
+       }
+
+       /* extract incoming grant infomation provided by the client */
+       ofd_grant_incoming(env, exp, oa);
+
+       /* unlike writes, we don't return grants back on reads unless a grant
+        * shrink request was packed and we decided to turn it down. */
+       if (do_shrink)
+               ofd_grant_shrink(exp, oa, left);
+       else
+               oa->o_grant = 0;
+
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+}
+
+/**
+ * Called at write prepare time to handle incoming grant, check that we have
+ * enough space and grant some space back to the client if possible.
+ *
+ * \param env - is the lu environment provided by the caller
+ * \param exp - is the export of the client which sent the request
+ * \paral oa - is the incoming obdo sent by the client
+ * \param rnb - is the list of network buffers
+ * \param niocont - is the number of network buffers in the list
+ */
+void ofd_grant_prepare_write(const struct lu_env *env,
+                            struct obd_export *exp, struct obdo *oa,
+                            struct niobuf_remote *rnb, int niocount)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct ofd_device       *ofd = ofd_exp(exp);
+       obd_size                 left;
+       int                      from_cache;
+       int                      force = 0; /* can use cached data intially */
+       int                      rc;
+
+       ENTRY;
+
+refresh:
+       /* get statfs information from OSD layer */
+       ofd_grant_statfs(env, exp, force, &from_cache);
+
+       cfs_spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
+
+       /* Grab free space from cached statfs data and take out space already
+        * granted to clients as well as reserved space */
+       left = ofd_grant_space_left(exp);
+
+       /* Get fresh statfs data if we are short in ungranted space */
+       if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
+                      obd->obd_name);
+               force = 1;
+               goto refresh;
+       }
+
+       /* When close to free space exhaustion, trigger a sync to force
+        * writeback cache to consume required space immediately and release as
+        * much space as possible. */
+       if (!obd->obd_recovering && force != 2 &&
+           left < ofd_grant_chunk(NULL, ofd)) {
+               bool from_grant = true;
+               int  i;
+
+               /* That said, it is worth running a sync only if some pages did
+                * not consume grant space on the client and could thus fail
+                * with ENOSPC later in ofd_grant_check() */
+               for (i = 0; i < niocount; i++)
+                       if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
+                               from_grant = false;
+
+               if (!from_grant) {
+                       /* at least one network buffer requires acquiring grant
+                        * space on the server */
+                       cfs_spin_unlock(&ofd->ofd_grant_lock);
+                       /* discard errors, at least we tried ... */
+                       rc = dt_sync(env, ofd->ofd_osd);
+                       force = 2;
+                       goto refresh;
+               }
+       }
+
+       /* extract incoming grant information provided by the client */
+       ofd_grant_incoming(env, exp, oa);
+
+       /* check limit */
+       ofd_grant_check(env, exp, oa, rnb, niocount, &left);
+
+       if (!(oa->o_valid & OBD_MD_FLGRANT)) {
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               RETURN_EXIT;
+       }
+
+       /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
+        * grant space. */
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           (oa->o_flags & OBD_FL_SHRINK_GRANT))
+               ofd_grant_shrink(exp, oa, left);
+       else
+               /* grant more space back to the client if possible */
+               oa->o_grant = ofd_grant(exp, oa->o_grant, oa->o_undirty, left);
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+}
+
+/**
+ * Called during object precreation to consume grant space.
+ * More space is granted for precreation if possible.
+ *
+ * \param env - is the lu environment provided by the caller
+ * \param exp - is the export holding the grant space for precreation (= self
+ *           export currently)
+ * \paral nr - is the number of objects the caller wants to create objects
+ */
+int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
+{
+       struct ofd_thread_info          *info = ofd_info(env);
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct filter_export_data       *fed = &exp->exp_filter_data;
+       obd_size                         left = 0;
+       unsigned long                    wanted;
+
+       ENTRY;
+
+       info->fti_used = 0;
+
+       if (exp->exp_obd->obd_recovering ||
+           ofd->ofd_dt_conf.ddp_inodespace == 0)
+               /* don't enforce grant during recovery */
+               RETURN(0);
+
+       /* Update statfs data if required */
+       ofd_grant_statfs(env, exp, 1, NULL);
+
+       /* protect all grant counters */
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+
+       /* fail precreate request if there is not enough blocks available for
+        * writing */
+       if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
+           (ofd->ofd_osfs.os_blocks >> 10)) {
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
+                      ofd_obd(ofd)->obd_name,
+                      ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
+               RETURN(-ENOSPC);
+       }
+
+       /* Grab free space from cached statfs data and take out space
+        * already granted to clients as well as reserved space */
+       left = ofd_grant_space_left(exp);
+
+       /* compute how much space is required to handle the precreation
+        * request */
+       wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
+       if (wanted > fed->fed_grant + left) {
+               /* that's beyond what remains, adjust the number of objects that
+                * can be safely precreated */
+               wanted = fed->fed_grant + left;
+               *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
+               if (*nr == 0) {
+                       /* we really have no space any more for precreation,
+                        * fail the precreate request with ENOSPC */
+                       cfs_spin_unlock(&ofd->ofd_grant_lock);
+                       RETURN(-ENOSPC);
+               }
+               /* compute space needed for the new number of creations */
+               wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
+       }
+       LASSERT(wanted <= fed->fed_grant + left);
+
+       if (wanted <= fed->fed_grant) {
+               /* we've enough grant space to handle this precreate request */
+               fed->fed_grant -= wanted;
+       } else {
+               /* we need to take some space from the ungranted pool */
+               ofd->ofd_tot_granted += wanted - fed->fed_grant;
+               left -= wanted - fed->fed_grant;
+               fed->fed_grant = 0;
+       }
+       info->fti_used = wanted;
+       fed->fed_pending += info->fti_used;
+       ofd->ofd_tot_pending += info->fti_used;
+
+       /* grant more space (twice as much as needed for this request) for
+        * precreate purpose if possible */
+       ofd_grant(exp, fed->fed_grant, wanted * 2, left);
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+       RETURN(0);
+}
+
+/**
+ * Called at commit time to update pending grant counter for writes in flight
+ *
+ * \param env - is the lu environment provided by the caller
+ * \param exp - is the export of the client which sent the request
+ */
+void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp,
+                     int rc)
+{
+       struct ofd_device       *ofd  = ofd_exp(exp);
+       struct ofd_thread_info  *info = ofd_info(env);
+       unsigned long            pending;
+
+       ENTRY;
+
+       /* get space accounted in tot_pending for the I/O, set in
+        * ofd_grant_check() */
+       pending = info->fti_used;
+       if (pending == 0)
+               RETURN_EXIT;
+
+       cfs_spin_lock(&ofd->ofd_grant_lock);
+       /* Don't update statfs data for errors raised before commit (e.g.
+        * bulk transfer failed, ...) since we know those writes have not been
+        * processed. For other errors hit during commit, we cannot really tell
+        * whether or not something was written, so we update statfs data.
+        * In any case, this should not be fatal since we always get fresh
+        * statfs data before failing a request with ENOSPC */
+       if (rc == 0) {
+               cfs_spin_lock(&ofd->ofd_osfs_lock);
+               /* Take pending out of cached statfs data */
+               ofd->ofd_osfs.os_bavail -= min_t(obd_size,
+                                                ofd->ofd_osfs.os_bavail,
+                                                pending >> ofd->ofd_blockbits);
+               if (ofd->ofd_statfs_inflight)
+                       /* someone is running statfs and want to be notified of
+                        * writes happening meanwhile */
+                       ofd->ofd_osfs_inflight += pending;
+               cfs_spin_unlock(&ofd->ofd_osfs_lock);
+       }
+
+       if (exp->exp_filter_data.fed_pending < pending) {
+               CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
+                      exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                      exp->exp_filter_data.fed_pending, pending);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+       exp->exp_filter_data.fed_pending -= pending;
+
+       if (ofd->ofd_tot_granted < pending) {
+                CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
+                       "\n", exp->exp_obd->obd_name,
+                       exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
+                       pending);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+       ofd->ofd_tot_granted -= pending;
+
+       if (ofd->ofd_tot_pending < pending) {
+                CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
+                       "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+                       exp, ofd->ofd_tot_pending, pending);
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+               LBUG();
+       }
+       ofd->ofd_tot_pending -= pending;
+       cfs_spin_unlock(&ofd->ofd_grant_lock);
+       EXIT;
+}
index db2a5b4..77559aa 100644 (file)
@@ -87,6 +87,39 @@ struct ofd_device {
        struct dt_object        *ofd_lastid_obj[OFD_MAX_GROUPS];
        cfs_spinlock_t           ofd_objid_lock;
 
        struct dt_object        *ofd_lastid_obj[OFD_MAX_GROUPS];
        cfs_spinlock_t           ofd_objid_lock;
 
+       /* protect all statfs-related counters */
+       cfs_spinlock_t           ofd_osfs_lock;
+       /* statfs optimization: we cache a bit  */
+       struct obd_statfs        ofd_osfs;
+       __u64                    ofd_osfs_age;
+       int                      ofd_blockbits;
+       /* writes which might be be accounted twice in ofd_osfs.os_bavail */
+       obd_size                 ofd_osfs_unstable;
+
+       /* counters used during statfs update, protected by ofd_osfs_lock.
+        * record when some statfs refresh are in progress */
+       int                      ofd_statfs_inflight;
+       /* track writes completed while statfs refresh is underway.
+        * tracking is only effective when ofd_statfs_inflight > 1 */
+       obd_size                 ofd_osfs_inflight;
+
+       /* grants: all values in bytes */
+       /* grant lock to protect all grant counters */
+       cfs_spinlock_t           ofd_grant_lock;
+       /* total amount of dirty data reported by clients in incoming obdo */
+       obd_size                 ofd_tot_dirty;
+       /* sum of filesystem space granted to clients for async writes */
+       obd_size                 ofd_tot_granted;
+       /* grant used by I/Os in progress (between prepare and commit) */
+       obd_size                 ofd_tot_pending;
+       /* free space threshold over which we stop granting space to clients
+        * ofd_grant_ratio is stored as a fixed-point fraction using
+        * OFD_GRANT_RATIO_SHIFT of the remaining free space, not in percentage
+        * values */
+       int                      ofd_grant_ratio;
+       /* number of clients using grants */
+       int                      ofd_tot_granted_clients;
+
        /* ofd mod data: ofd_device wide values */
        int                      ofd_fmd_max_num; /* per ofd ofd_mod_data */
        cfs_duration_t           ofd_fmd_max_age; /* time to fmd expiry */
        /* ofd mod data: ofd_device wide values */
        int                      ofd_fmd_max_num; /* per ofd ofd_mod_data */
        cfs_duration_t           ofd_fmd_max_age; /* time to fmd expiry */
@@ -96,7 +129,10 @@ struct ofd_device {
                                 /* sync journal on writes */
                                 ofd_syncjournal:1,
                                 /* sync on lock cancel */
                                 /* sync journal on writes */
                                 ofd_syncjournal:1,
                                 /* sync on lock cancel */
-                                ofd_sync_lock_cancel:2;
+                                ofd_sync_lock_cancel:2,
+                                /* shall we grant space to clients not
+                                 * supporting OBD_CONNECT_GRANT_PARAM? */
+                                ofd_grant_compat_disable:1;
 
        struct lu_site           ofd_site;
 };
 
        struct lu_site           ofd_site;
 };
@@ -149,6 +185,9 @@ struct ofd_thread_info {
        struct dt_object_format          fti_dof;
        struct lu_buf                    fti_buf;
        loff_t                           fti_off;
        struct dt_object_format          fti_dof;
        struct lu_buf                    fti_buf;
        loff_t                           fti_off;
+
+       /* Space used by the I/O, used by grant code */
+       unsigned long                    fti_used;
 };
 
 extern void target_recovery_fini(struct obd_device *obd);
 };
 
 extern void target_recovery_fini(struct obd_device *obd);
@@ -165,6 +204,9 @@ extern struct lu_context_key ofd_thread_key;
 
 /* ofd_obd.c */
 extern struct obd_ops ofd_obd_ops;
 
 /* ofd_obd.c */
 extern struct obd_ops ofd_obd_ops;
+int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
+                       struct obd_statfs *osfs, __u64 max_age,
+                       int *from_cache);
 
 /* ofd_fs.c */
 obd_id ofd_last_id(struct ofd_device *ofd, obd_seq seq);
 
 /* ofd_fs.c */
 obd_id ofd_last_id(struct ofd_device *ofd, obd_seq seq);
@@ -178,6 +220,60 @@ void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars);
 int lproc_ofd_attach_seqstat(struct obd_device *dev);
 extern struct file_operations ofd_per_nid_stats_fops;
 
 int lproc_ofd_attach_seqstat(struct obd_device *dev);
 extern struct file_operations ofd_per_nid_stats_fops;
 
+/* ofd_grants.c */
+#define OFD_GRANT_RATIO_SHIFT 8
+static inline __u64 ofd_grant_reserved(struct ofd_device *ofd, obd_size bavail)
+{
+       return (bavail * ofd->ofd_grant_ratio) >> OFD_GRANT_RATIO_SHIFT;
+}
+
+static inline int ofd_grant_ratio_conv(int percentage)
+{
+       return (percentage << OFD_GRANT_RATIO_SHIFT) / 100;
+}
+
+static inline int ofd_grant_param_supp(struct obd_export *exp)
+{
+       return !!(exp->exp_connect_flags & OBD_CONNECT_GRANT_PARAM);
+}
+
+/* Blocksize used for client not supporting OBD_CONNECT_GRANT_PARAM.
+ * That's 4KB=2^12 which is the biggest block size known to work whatever
+ * the client's page size is. */
+#define COMPAT_BSIZE_SHIFT 12
+static inline int ofd_grant_compat(struct obd_export *exp,
+                                  struct ofd_device *ofd)
+{
+       /* Clients which don't support OBD_CONNECT_GRANT_PARAM cannot handle
+        * a block size > page size and consume CFS_PAGE_SIZE of grant when
+        * dirtying a page regardless of the block size */
+       return !!(ofd_obd(ofd)->obd_self_export != exp &&
+                 ofd->ofd_blockbits > COMPAT_BSIZE_SHIFT &&
+                 !ofd_grant_param_supp(exp));
+}
+
+static inline int ofd_grant_prohibit(struct obd_export *exp,
+                                    struct ofd_device *ofd)
+{
+       /* When ofd_grant_compat_disable is set, we don't grant any space to
+        * clients not supporting OBD_CONNECT_GRANT_PARAM.
+        * Otherwise, space granted to such a client is inflated since it
+        * consumes CFS_PAGE_SIZE of grant space per block */
+       return !!(ofd_grant_compat(exp, ofd) && ofd->ofd_grant_compat_disable);
+}
+
+void ofd_grant_sanity_check(struct obd_device *obd, const char *func);
+long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
+                      obd_size want);
+void ofd_grant_discard(struct obd_export *exp);
+void ofd_grant_prepare_read(const struct lu_env *env, struct obd_export *exp,
+                           struct obdo *oa);
+void ofd_grant_prepare_write(const struct lu_env *env, struct obd_export *exp,
+                            struct obdo *oa, struct niobuf_remote *rnb,
+                            int niocount);
+void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp, int rc);
+int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr);
+
 /* ofd_fmd.c */
 int ofd_fmd_init(void);
 void ofd_fmd_exit(void);
 /* ofd_fmd.c */
 int ofd_fmd_init(void);
 void ofd_fmd_exit(void);
@@ -231,4 +327,9 @@ static inline void ofd_slc_set(struct ofd_device *ofd)
                ofd->ofd_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
 }
 
                ofd->ofd_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
 }
 
+/* niobuf_local has no rnb_ prefix in master */
+#define rnb_offset offset
+#define rnb_flags  flags
+#define rnb_len    len
+
 #endif /* _OFD_INTERNAL_H */
 #endif /* _OFD_INTERNAL_H */
index ad0373a..5eba90b 100644 (file)
@@ -154,6 +154,18 @@ static int ofd_parse_connect_data(const struct lu_env *env,
        else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN)
                RETURN(-EPROTO);
 
        else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN)
                RETURN(-EPROTO);
 
+       if (ofd_grant_param_supp(exp)) {
+               exp->exp_filter_data.fed_pagesize = data->ocd_blocksize;
+               /* ocd_{blocksize,inodespace} are log2 values */
+               data->ocd_blocksize  = ofd->ofd_blockbits;
+               data->ocd_inodespace = ofd->ofd_dt_conf.ddp_inodespace;
+               /* ocd_grant_extent is in 1K blocks */
+               data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10;
+       }
+
+       if (exp->exp_connect_flags & OBD_CONNECT_GRANT)
+               data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant);
+
        if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
                struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd;
                int                    index = lsd->lsd_ost_index;
        if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
                struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd;
                int                    index = lsd->lsd_ost_index;
@@ -319,16 +331,22 @@ out:
 
 static int ofd_obd_disconnect(struct obd_export *exp)
 {
 
 static int ofd_obd_disconnect(struct obd_export *exp)
 {
-       struct lu_env   env;
-       int             rc;
+       struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
+       struct lu_env            env;
+       int                      rc;
 
        ENTRY;
 
        LASSERT(exp);
        class_export_get(exp);
 
 
        ENTRY;
 
        LASSERT(exp);
        class_export_get(exp);
 
+       if (!(exp->exp_flags & OBD_OPT_FORCE))
+               ofd_grant_sanity_check(ofd_obd(ofd), __FUNCTION__);
+
        rc = server_disconnect_export(exp);
 
        rc = server_disconnect_export(exp);
 
+       ofd_grant_discard(exp);
+
        rc = lu_env_init(&env, LCT_DT_THREAD);
        if (rc)
                RETURN(rc);
        rc = lu_env_init(&env, LCT_DT_THREAD);
        if (rc)
                RETURN(rc);
@@ -369,6 +387,8 @@ static int ofd_init_export(struct obd_export *exp)
 
 static int ofd_destroy_export(struct obd_export *exp)
 {
 
 static int ofd_destroy_export(struct obd_export *exp)
 {
+       struct ofd_device *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
+
        if (exp->exp_filter_data.fed_pending)
                CERROR("%s: cli %s/%p has %lu pending on destroyed export"
                       "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
        if (exp->exp_filter_data.fed_pending)
                CERROR("%s: cli %s/%p has %lu pending on destroyed export"
                       "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
@@ -385,6 +405,21 @@ static int ofd_destroy_export(struct obd_export *exp)
 
        ofd_fmd_cleanup(exp);
 
 
        ofd_fmd_cleanup(exp);
 
+       /*
+        * discard grants once we're sure no more
+        * interaction with the client is possible
+        */
+       ofd_grant_discard(exp);
+       ofd_fmd_cleanup(exp);
+
+       if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) {
+               if (ofd->ofd_tot_granted_clients > 0)
+                       ofd->ofd_tot_granted_clients --;
+       }
+
+       if (!(exp->exp_flags & OBD_OPT_FORCE))
+               ofd_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
        LASSERT(cfs_list_empty(&exp->exp_filter_data.fed_mod_list));
        return 0;
 }
        LASSERT(cfs_list_empty(&exp->exp_filter_data.fed_mod_list));
        return 0;
 }
@@ -438,6 +473,11 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp,
                        CERROR("ofd update capability key failed: %d\n", rc);
        } else if (KEY_IS(KEY_MDS_CONN)) {
                rc = ofd_set_mds_conn(exp, val);
                        CERROR("ofd update capability key failed: %d\n", rc);
        } else if (KEY_IS(KEY_MDS_CONN)) {
                rc = ofd_set_mds_conn(exp, val);
+       } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+               struct ost_body *body = val;
+
+               /** handle grant shrink, similar to a read request */
+               ofd_grant_prepare_read(env, exp, &body->oa);
        } else {
                CERROR("%s: Unsupported key %s\n",
                       exp->exp_obd->obd_name, (char*)key);
        } else {
                CERROR("%s: Unsupported key %s\n",
                       exp->exp_obd->obd_name, (char*)key);
@@ -500,16 +540,85 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
 {
        int rc;
 
 {
        int rc;
 
-       rc = dt_statfs(env, ofd->ofd_osd, osfs);
-       if (unlikely(rc))
-               return rc;
-
+       cfs_spin_lock(&ofd->ofd_osfs_lock);
+       if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) {
+               obd_size unstable;
+
+               /* statfs data are too old, get up-to-date one.
+                * we must be cautious here since multiple threads might be
+                * willing to update statfs data concurrently and we must
+                * grant that cached statfs data are always consistent */
+
+               if (ofd->ofd_statfs_inflight == 0)
+                       /* clear inflight counter if no users, although it would
+                        * take a while to overflow this 64-bit counter ... */
+                       ofd->ofd_osfs_inflight = 0;
+               /* notify ofd_grant_commit() that we want to track writes
+                * completed as of now */
+               ofd->ofd_statfs_inflight++;
+               /* record value of inflight counter before running statfs to
+                * compute the diff once statfs is completed */
+               unstable = ofd->ofd_osfs_inflight;
+               cfs_spin_unlock(&ofd->ofd_osfs_lock);
+
+               /* statfs can sleep ... hopefully not for too long since we can
+                * call it fairly often as space fills up */
+               rc = dt_statfs(env, ofd->ofd_osd, osfs);
+               if (unlikely(rc))
+                       return rc;
+
+               cfs_spin_lock(&ofd->ofd_grant_lock);
+               cfs_spin_lock(&ofd->ofd_osfs_lock);
+               /* calculate how much space was written while we released the
+                * ofd_osfs_lock */
+               unstable = ofd->ofd_osfs_inflight - unstable;
+               ofd->ofd_osfs_unstable = 0;
+               if (unstable) {
+                       /* some writes completed while we were running statfs
+                        * w/o the ofd_osfs_lock. Those ones got added to
+                        * the cached statfs data that we are about to crunch.
+                        * Take them into account in the new statfs data */
+                       osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
+                                              unstable >> ofd->ofd_blockbits);
+                       /* However, we don't really know if those writes got
+                        * accounted in the statfs call, so tell
+                        * ofd_grant_space_left() there is some uncertainty
+                        * on the accounting of those writes.
+                        * The purpose is to prevent spurious error messages in
+                        * ofd_grant_space_left() since those writes might be
+                        * accounted twice. */
+                       ofd->ofd_osfs_unstable += unstable;
+               }
+               /* similarly, there is some uncertainty on write requests
+                * between prepare & commit */
+               ofd->ofd_osfs_unstable += ofd->ofd_tot_pending;
+               cfs_spin_unlock(&ofd->ofd_grant_lock);
+
+               /* finally udpate cached statfs data */
+               ofd->ofd_osfs = *osfs;
+               ofd->ofd_osfs_age = cfs_time_current_64();
+
+               ofd->ofd_statfs_inflight--; /* stop tracking */
+               if (ofd->ofd_statfs_inflight == 0)
+                       ofd->ofd_osfs_inflight = 0;
+               cfs_spin_unlock(&ofd->ofd_osfs_lock);
+
+               if (from_cache)
+                       *from_cache = 0;
+       } else {
+               /* use cached statfs data */
+               *osfs = ofd->ofd_osfs;
+               cfs_spin_unlock(&ofd->ofd_osfs_lock);
+               if (from_cache)
+                       *from_cache = 1;
+       }
        return 0;
 }
 
 static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
        return 0;
 }
 
 static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
+        struct obd_device      *obd = class_exp2obd(exp);
        struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
        int                      rc;
 
        struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
        int                      rc;
 
@@ -519,6 +628,36 @@ static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
        if (unlikely(rc))
                GOTO(out, rc);
 
        if (unlikely(rc))
                GOTO(out, rc);
 
+       /* at least try to account for cached pages.  its still racy and
+        * might be under-reporting if clients haven't announced their
+        * caches with brw recently */
+
+       CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
+              " pending "LPU64" free "LPU64" avail "LPU64"\n",
+              ofd->ofd_tot_dirty, ofd->ofd_tot_granted, ofd->ofd_tot_pending,
+              osfs->os_bfree << ofd->ofd_blockbits,
+              osfs->os_bavail << ofd->ofd_blockbits);
+
+       osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
+                                ((ofd->ofd_tot_dirty + ofd->ofd_tot_pending +
+                                  osfs->os_bsize - 1) >> ofd->ofd_blockbits));
+
+       /* The QoS code on the MDS does not care about space reserved for
+        * precreate, so take it out. */
+       if (exp->exp_connect_flags & OBD_CONNECT_MDS) {
+               struct filter_export_data *fed;
+
+               fed = &obd->obd_self_export->exp_filter_data;
+               osfs->os_bavail -= min_t(obd_size, osfs->os_bavail,
+                                        fed->fed_grant >> ofd->ofd_blockbits);
+       }
+
+       ofd_grant_sanity_check(obd, __FUNCTION__);
+       CDEBUG(D_CACHE, LPU64" blocks: "LPU64" free, "LPU64" avail; "
+              LPU64" objects: "LPU64" free; state %x\n",
+              osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
+              osfs->os_files, osfs->os_ffree, osfs->os_state);
+
        if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC,
                                 ofd->ofd_lut.lut_lsd.lsd_ost_index))
                osfs->os_bfree = osfs->os_bavail = 2;
        if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC,
                                 ofd->ofd_lut.lut_lsd.lsd_ost_index))
                osfs->os_bfree = osfs->os_bavail = 2;
@@ -530,6 +669,19 @@ static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
        /* OS_STATE_READONLY can be set by OSD already */
        if (ofd->ofd_raid_degraded)
                osfs->os_state |= OS_STATE_DEGRADED;
        /* OS_STATE_READONLY can be set by OSD already */
        if (ofd->ofd_raid_degraded)
                osfs->os_state |= OS_STATE_DEGRADED;
+
+       if (obd->obd_self_export != exp && ofd_grant_compat(exp, ofd)) {
+               /* clients which don't support OBD_CONNECT_GRANT_PARAM
+                * should not see a block size > page size, otherwise
+                * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
+                * block size which is the biggest block size known to work
+                * with all client's page size. */
+               osfs->os_blocks <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bfree  <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bavail <<= ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bsize    = 1 << COMPAT_BSIZE_SHIFT;
+       }
+
        EXIT;
 out:
        return rc;
        EXIT;
 out:
        return rc;