Whamcloud - gitweb
LU-7015 ofd: Fix wanted grant calculation
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 /*
33  * lustre/ofd/ofd_grant.c
34  *
35  * This file provides code related to grant space management on Object Storage
36  * Targets (OSTs). Grant is a mechanism used by client nodes to reserve disk
37  * space on OSTs for the data writeback cache. The Lustre client is thus assured
38  * that enough space will be available when flushing dirty pages asynchronously.
39  * Each client node is granted an initial amount of reserved space at connect
40  * time and gets additional space back from OST in bulk write reply.
41  *
42  * This file handles the core logic for:
43  * - grant allocation strategy
44  * - maintaining per-client as well as global grant space accounting
45  * - processing grant information packed in incoming requests
46  * - allocating server-side grant space for synchronous write RPCs which did not
47  *   consume grant on the client side (OBD_BRW_FROM_GRANT flag not set). If not
48  *   enough space is available, such RPCs fail with ENOSPC
49  *
50  * Author: Johann Lombardi <johann.lombardi@intel.com>
51  */
52
53 #define DEBUG_SUBSYSTEM S_FILTER
54
55 #include "ofd_internal.h"
56
57 /* At least enough to send a couple of 1MB RPCs, even if not max sized */
58 #define OFD_GRANT_CHUNK                 (2ULL * DT_MAX_BRW_SIZE)
59
60 /* Clients typically hold 2x their max_rpcs_in_flight of grant space */
61 #define OFD_GRANT_SHRINK_LIMIT(exp)     (2ULL * 8 * exp_max_brw_size(exp))
62
63 static inline u64 ofd_grant_from_cli(struct obd_export *exp,
64                                      struct ofd_device *ofd, u64 val)
65 {
66         if (ofd_grant_compat(exp, ofd))
67                 /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
68                  * consume 4KB of grant per block, we should thus inflate
69                  * the grant counters to reflect what was actually consumed */
70                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
71         return val;
72 }
73
74 static inline u64 ofd_grant_to_cli(struct obd_export *exp,
75                                    struct ofd_device *ofd, u64 val)
76 {
77         if (ofd_grant_compat(exp, ofd))
78                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
79         return val;
80 }
81
82 static inline u64 ofd_grant_chunk(struct obd_export *exp,
83                                   struct ofd_device *ofd)
84 {
85         if (ofd_obd(ofd)->obd_self_export == exp)
86                 /* Grant enough space to handle a big precreate request */
87                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
88
89         if (ofd_grant_compat(exp, ofd))
90                 /* Try to grant enough space to send a full-size RPC */
91                 return exp_max_brw_size(exp) <<
92                        (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
93
94         /* Try to return enough to send two full RPCs, if needed */
95         return exp_max_brw_size(exp) * 2;
96 }
97
98 /**
99  * Perform extra sanity checks for grant accounting.
100  *
101  * This function scans the export list, sanity checks per-export grant counters
102  * and verifies accuracy of global grant accounting. If an inconsistency is
103  * found, a CERROR is printed with the function name \func that was passed as
104  * argument. LBUG is only called in case of serious counter corruption (i.e.
105  * value larger than the device size).
106  * Those sanity checks can be pretty expensive and are disabled if the OBD
107  * device has more than 100 connected exports.
108  *
109  * \param[in] obd       OBD device for which grant accounting should be
110  *                      verified
111  * \param[in] func      caller's function name
112  */
113 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
114 {
115         struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
116         struct obd_export *exp;
117         u64                maxsize;
118         u64                tot_dirty = 0;
119         u64                tot_pending = 0;
120         u64                tot_granted = 0;
121         u64                fo_tot_granted;
122         u64                fo_tot_pending;
123         u64                fo_tot_dirty;
124
125         if (list_empty(&obd->obd_exports))
126                 return;
127
128         /* We don't want to do this for large machines that do lots of
129          * mounts or unmounts.  It burns... */
130         if (obd->obd_num_exports > 100)
131                 return;
132
133         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
134
135         spin_lock(&obd->obd_dev_lock);
136         spin_lock(&ofd->ofd_grant_lock);
137         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
138                 struct filter_export_data       *fed;
139                 int                              error = 0;
140
141                 fed = &exp->exp_filter_data;
142
143                 if (obd->obd_self_export == exp)
144                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
145                                "%ld\n", obd->obd_name, fed->fed_grant,
146                                fed->fed_pending, fed->fed_dirty);
147
148                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
149                     fed->fed_dirty < 0)
150                         error = 1;
151                 if (fed->fed_grant + fed->fed_pending > maxsize) {
152                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
153                                " > maxsize("LPU64")\n", obd->obd_name,
154                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
155                                fed->fed_pending, maxsize);
156                         spin_unlock(&obd->obd_dev_lock);
157                         spin_unlock(&ofd->ofd_grant_lock);
158                         LBUG();
159                 }
160                 if (fed->fed_dirty > maxsize) {
161                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
162                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
163                                exp, fed->fed_dirty, maxsize);
164                         spin_unlock(&obd->obd_dev_lock);
165                         spin_unlock(&ofd->ofd_grant_lock);
166                         LBUG();
167                 }
168                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
169                              "%ld pend %ld grant %ld\n", obd->obd_name,
170                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
171                              fed->fed_pending, fed->fed_grant);
172                 tot_granted += fed->fed_grant + fed->fed_pending;
173                 tot_pending += fed->fed_pending;
174                 tot_dirty += fed->fed_dirty;
175         }
176         spin_unlock(&obd->obd_dev_lock);
177         fo_tot_granted = ofd->ofd_tot_granted;
178         fo_tot_pending = ofd->ofd_tot_pending;
179         fo_tot_dirty = ofd->ofd_tot_dirty;
180
181         if (tot_granted != fo_tot_granted)
182                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
183                        func, tot_granted, fo_tot_granted);
184         if (tot_pending != fo_tot_pending)
185                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
186                        func, tot_pending, fo_tot_pending);
187         if (tot_dirty != fo_tot_dirty)
188                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
189                        func, tot_dirty, fo_tot_dirty);
190         if (tot_pending > tot_granted)
191                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
192                        func, tot_pending, tot_granted);
193         if (tot_granted > maxsize)
194                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
195                        func, tot_granted, maxsize);
196         if (tot_dirty > maxsize)
197                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
198                        func, tot_dirty, maxsize);
199         spin_unlock(&ofd->ofd_grant_lock);
200 }
201
202 /**
203  * Update cached statfs information from the OSD layer
204  *
205  * Refresh statfs information cached in ofd::ofd_osfs if the cache is older
206  * than 1s or if force is set. The OSD layer is in charge of estimating data &
207  * metadata overhead.
208  * This function can sleep so it should not be called with any spinlock held.
209  *
210  * \param[in] env               LU environment passed by the caller
211  * \param[in] exp               export used to print client info in debug
212  *                              messages
213  * \param[in] force             force a refresh of statfs information
214  * \param[out] from_cache       returns whether the statfs information are
215  *                              taken from cache
216  */
217 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
218                              int force, int *from_cache)
219 {
220         struct obd_device       *obd = exp->exp_obd;
221         struct ofd_device       *ofd = ofd_exp(exp);
222         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
223         __u64                    max_age;
224         int                      rc;
225
226         if (force)
227                 max_age = 0; /* get fresh statfs data */
228         else
229                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
230
231         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
232         if (unlikely(rc)) {
233                 if (from_cache)
234                         *from_cache = 0;
235                 return;
236         }
237
238         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
239                obd->obd_name, exp->exp_client_uuid.uuid, exp,
240                osfs->os_bfree << ofd->ofd_blockbits,
241                osfs->os_bavail << ofd->ofd_blockbits);
242 }
243
244 /**
245  * Figure out how much space is available on the backend filesystem after
246  * removing grant space already booked by clients.
247  *
248  * This is done by accessing cached statfs data previously populated by
249  * ofd_grant_statfs(), from which we withdraw the space already granted to
250  * clients and the reserved space.
251  * Caller must hold ofd_grant_lock spinlock.
252  *
253  * \param[in] exp       export associated with the device for which the amount
254  *                      of available space is requested
255  * \retval              amount of non-allocated space, in bytes
256  */
257 static u64 ofd_grant_space_left(struct obd_export *exp)
258 {
259         struct obd_device *obd = exp->exp_obd;
260         struct ofd_device *ofd = ofd_exp(exp);
261         u64                tot_granted;
262         u64                left;
263         u64                avail;
264         u64                unstable;
265
266         ENTRY;
267         assert_spin_locked(&ofd->ofd_grant_lock);
268
269         spin_lock(&ofd->ofd_osfs_lock);
270         /* get available space from cached statfs data */
271         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
272         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
273         spin_unlock(&ofd->ofd_osfs_lock);
274
275         tot_granted = ofd->ofd_tot_granted;
276
277         if (left < tot_granted) {
278                 int mask = (left + unstable <
279                             tot_granted - ofd->ofd_tot_pending) ?
280                             D_ERROR : D_CACHE;
281
282                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
283                              LPU64" unstable "LPU64" pending "LPU64" "
284                              "dirty "LPU64"\n",
285                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
286                              left, tot_granted, unstable,
287                              ofd->ofd_tot_pending, ofd->ofd_tot_dirty);
288                 RETURN(0);
289         }
290
291         avail = left;
292         /* Withdraw space already granted to clients */
293         left -= tot_granted;
294
295         /* If the left space is below the grant threshold x available space,
296          * stop granting space to clients.
297          * The purpose of this threshold is to keep some error margin on the
298          * overhead estimate made by the OSD layer. If we grant all the free
299          * space, we have no way (grant space cannot be revoked yet) to
300          * adjust if the write overhead has been underestimated. */
301         left -= min_t(u64, left, ofd_grant_reserved(ofd, avail));
302
303         /* Align left on block size */
304         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
305
306         CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
307                LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
308                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
309                tot_granted, ofd->ofd_tot_pending);
310
311         RETURN(left);
312 }
313
314 /**
315  * Process grant information from obdo structure packed in incoming BRW
316  *
317  * Grab the dirty and seen grant announcements from the incoming obdo.
318  * We will later calculate the client's new grant and return it.
319  * Caller must hold ofd_grant_lock spinlock.
320  *
321  * \param[in] env       LU environment supplying osfs storage
322  * \param[in] exp       export for which we received the request
323  * \param[in,out] oa    incoming obdo sent by the client
324  *
325  */
326 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
327                                struct obdo *oa)
328 {
329         struct filter_export_data       *fed;
330         struct ofd_device               *ofd = ofd_exp(exp);
331         struct obd_device               *obd = exp->exp_obd;
332         long                             dirty;
333         long                             dropped;
334         long                             grant_chunk;
335         ENTRY;
336
337         assert_spin_locked(&ofd->ofd_grant_lock);
338
339         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
340                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
341                 oa->o_valid &= ~OBD_MD_FLGRANT;
342                 RETURN_EXIT;
343         }
344
345         fed = &exp->exp_filter_data;
346
347         /* Add some margin, since there is a small race if other RPCs arrive
348          * out-or-order and have already consumed some grant.  We want to
349          * leave this here in case there is a large error in accounting. */
350         CDEBUG(D_CACHE,
351                "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
352                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
353                oa->o_dropped, fed->fed_grant);
354
355         if ((long long)oa->o_dirty < 0)
356                 oa->o_dirty = 0;
357
358         dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
359         dropped     = ofd_grant_from_cli(exp, ofd, (u64)oa->o_dropped);
360         grant_chunk = ofd_grant_chunk(exp, ofd);
361
362         /* Update our accounting now so that statfs takes it into account.
363          * Note that fed_dirty is only approximate and can become incorrect
364          * if RPCs arrive out-of-order.  No important calculations depend
365          * on fed_dirty however, but we must check sanity to not assert. */
366         if (dirty > fed->fed_grant + 4 * grant_chunk)
367                 dirty = fed->fed_grant + 4 * grant_chunk;
368         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
369         if (fed->fed_grant < dropped) {
370                 CDEBUG(D_CACHE,
371                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
372                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
373                        fed->fed_grant);
374                 dropped = 0;
375         }
376         if (ofd->ofd_tot_granted < dropped) {
377                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
378                        "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
379                        dropped, ofd->ofd_tot_granted);
380                 dropped = 0;
381         }
382         ofd->ofd_tot_granted -= dropped;
383         fed->fed_grant -= dropped;
384         fed->fed_dirty = dirty;
385
386         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
387                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
388                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
389                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
390                 spin_unlock(&ofd->ofd_grant_lock);
391                 LBUG();
392         }
393         EXIT;
394 }
395
396 /**
397  * Grant shrink request handler.
398  *
399  * Client nodes can explicitly release grant space (i.e. process called grant
400  * shrinking). This function proceeds with the shrink request when there is
401  * less ungranted space remaining than the amount all of the connected clients
402  * would consume if they used their full grant.
403  * Caller must hold ofd_grant_lock spinlock.
404  *
405  * \param[in] exp               export releasing grant space
406  * \param[in,out] oa            incoming obdo sent by the client
407  * \param[in] left_space        remaining free space with space already granted
408  *                              taken out
409  */
410 static void ofd_grant_shrink(struct obd_export *exp, struct obdo *oa,
411                              u64 left_space)
412 {
413         struct filter_export_data       *fed;
414         struct ofd_device               *ofd = ofd_exp(exp);
415         struct obd_device               *obd = exp->exp_obd;
416         long                             grant_shrink;
417
418         assert_spin_locked(&ofd->ofd_grant_lock);
419         LASSERT(exp);
420         if (left_space >= ofd->ofd_tot_granted_clients *
421                           OFD_GRANT_SHRINK_LIMIT(exp))
422                 return;
423
424         grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
425
426         fed = &exp->exp_filter_data;
427         fed->fed_grant       -= grant_shrink;
428         ofd->ofd_tot_granted -= grant_shrink;
429
430         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
431                LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
432                exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
433
434         /* client has just released some grant, don't grant any space back */
435         oa->o_grant = 0;
436 }
437
438 /**
439  * Calculate how much space is required to write a given network buffer
440  *
441  * This function takes block alignment into account to estimate how much on-disk
442  * space will be required to successfully write the whole niobuf.
443  * Estimated space is inflated if the export does not support
444  * OBD_CONNECT_GRANT_PARAM and if the backend filesystem has a block size
445  * larger than the minimal supported page size (i.e. 4KB).
446  *
447  * \param[in] exp       export associated which the write request
448  * \param[in] ofd       ofd device handling the request
449  * \param[in] rnb       network buffer to estimate size of
450  *
451  * \retval              space (in bytes) that will be consumed to write the
452  *                      network buffer
453  */
454 static inline u64 ofd_grant_rnb_size(struct obd_export *exp,
455                                      struct ofd_device *ofd,
456                                      struct niobuf_remote *rnb)
457 {
458         u64 blocksize;
459         u64 bytes;
460         u64 end;
461
462         if (exp && ofd_grant_compat(exp, ofd))
463                 blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
464         else
465                 blocksize = 1ULL << ofd->ofd_blockbits;
466
467         /* The network buffer might span several blocks, align it on block
468          * boundaries */
469         bytes  = rnb->rnb_offset & (blocksize - 1);
470         bytes += rnb->rnb_len;
471         end    = bytes & (blocksize - 1);
472         if (end)
473                 bytes += blocksize - end;
474         if (exp)
475                 /* Apply per-export pecularities if one is given */
476                 bytes = ofd_grant_from_cli(exp, ofd, bytes);
477         return bytes;
478 }
479
480 /**
481  * Validate grant accounting for each incoming remote network buffer.
482  *
483  * When clients have dirtied as much space as they've been granted they
484  * fall through to sync writes. These sync writes haven't been expressed
485  * in grants and need to error with ENOSPC when there isn't room in the
486  * filesystem for them after grants are taken into account. However,
487  * writeback of the dirty data that was already granted space can write
488  * right on through.
489  * The OBD_BRW_GRANTED flag will be set in the rnb_flags of each network
490  * buffer which has been granted enough space to proceed. Buffers without
491  * this flag will fail to be written with -ENOSPC (see ofd_preprw_write().
492  * Caller must hold ofd_grant_lock spinlock.
493  *
494  * \param[in] env       LU environment passed by the caller
495  * \param[in] exp       export identifying the client which sent the RPC
496  * \param[in] oa        incoming obdo in which we should return the pack the
497  *                      additional grant
498  * \param[in,out] rnb   the list of network buffers
499  * \param[in] niocount  the number of network buffers in the list
500  * \param[in] left      the remaining free space with space already granted
501  *                      taken out
502  */
503 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
504                             struct obdo *oa, struct niobuf_remote *rnb,
505                             int niocount, u64 *left)
506 {
507         struct filter_export_data       *fed = &exp->exp_filter_data;
508         struct obd_device               *obd = exp->exp_obd;
509         struct ofd_device               *ofd = ofd_exp(exp);
510         unsigned long                    ungranted = 0;
511         unsigned long                    granted = 0;
512         int                              i;
513         int                              resend = 0;
514         struct ofd_thread_info          *info = ofd_info(env);
515
516         ENTRY;
517
518         assert_spin_locked(&ofd->ofd_grant_lock);
519
520         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
521             (oa->o_flags & OBD_FL_RECOV_RESEND)) {
522                 resend = 1;
523                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
524                                 "accounting\n");
525         }
526
527         for (i = 0; i < niocount; i++) {
528                 int bytes;
529
530                 if (obd->obd_recovering) {
531                         /* Replaying write. Grant info have been processed
532                          * already so no need to do any enforcement here.
533                          * It is worth noting that only bulk writes with all
534                          * rnbs having OBD_BRW_FROM_GRANT can be replayed.
535                          * If one page hasn't OBD_BRW_FROM_GRANT set, then
536                          * the whole bulk is written synchronously */
537                         if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
538                                  rnb[i].rnb_flags |= OBD_BRW_GRANTED;
539                                  continue;
540                         } else {
541                                 CERROR("%s: cli %s is replaying OST_WRITE "
542                                        "while one rnb hasn't OBD_BRW_FROM_GRANT"
543                                        " set (0x%x)\n", exp->exp_obd->obd_name,
544                                         exp->exp_client_uuid.uuid,
545                                         rnb[i].rnb_flags);
546
547                         }
548                 } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
549                            (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
550                         if (resend) {
551                                 /* This is a recoverable resend so grant
552                                  * information have already been processed */
553                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
554                                 continue;
555                         }
556
557                         /* inflate consumed space if needed */
558                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
559                         if (fed->fed_grant < granted + bytes) {
560                                 CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
561                                        "GRANT, real grant %lu idx %d\n",
562                                        exp->exp_obd->obd_name,
563                                        exp->exp_client_uuid.uuid, exp,
564                                        granted, bytes, fed->fed_grant, i);
565                         } else {
566                                 granted += bytes;
567                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
568                                 continue;
569                         }
570                 }
571
572                 /* Consume grant space on the server.
573                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
574                  * so that the required grant space isn't inflated. This is
575                  * done on purpose since the server can deal with large block
576                  * size, unlike some clients */
577                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
578                 if (*left > ungranted + bytes) {
579                         /* if enough space, pretend it was granted */
580                         ungranted += bytes;
581                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
582                         continue;
583                 }
584
585                 /* We can't check for already-mapped blocks here (make sense
586                  * when backend filesystem does not use COW) as it requires
587                  * dropping the grant lock.
588                  * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
589                  * to go through and verify if all of the blocks not marked
590                  *  BRW_GRANTED are already mapped and we can ignore this error.
591                  */
592                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
593                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
594                                 exp->exp_obd->obd_name,
595                                 exp->exp_client_uuid.uuid, exp, i, bytes);
596         }
597
598         /* record space used for the I/O, will be used in ofd_grant_commmit() */
599         /* Now substract what the clients has used already.  We don't subtract
600          * this from the tot_granted yet, so that other client's can't grab
601          * that space before we have actually allocated our blocks. That
602          * happens in ofd_grant_commit() after the writes are done. */
603         info->fti_used = granted + ungranted;
604         *left -= ungranted;
605         fed->fed_grant -= granted;
606         fed->fed_pending += info->fti_used;
607         ofd->ofd_tot_granted += ungranted;
608         ofd->ofd_tot_pending += info->fti_used;
609
610         CDEBUG(D_CACHE,
611                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
612                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
613                granted, ungranted, fed->fed_grant, fed->fed_dirty);
614
615         if (obd->obd_recovering)
616                 /* don't update dirty accounting during recovery */
617                 RETURN_EXIT;
618
619         if (fed->fed_dirty < granted) {
620                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
621                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
622                        granted, fed->fed_dirty);
623                 granted = fed->fed_dirty;
624         }
625         ofd->ofd_tot_dirty -= granted;
626         fed->fed_dirty -= granted;
627
628         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
629                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
630                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
631                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
632                 spin_unlock(&ofd->ofd_grant_lock);
633                 LBUG();
634         }
635         EXIT;
636 }
637
638 /**
639  * Allocate additional grant space to a client
640  *
641  * Calculate how much grant space to return to client, based on how much space
642  * is currently free and how much of that is already granted.
643  * Caller must hold ofd_grant_lock spinlock.
644  *
645  * \param[in] exp               export of the client which sent the request
646  * \param[in] curgrant          current grant claimed by the client
647  * \param[in] want              how much grant space the client would like to
648  *                              have
649  * \param[in] left              remaining free space with granted space taken
650  *                              out
651  * \param[in] conservative      if set to true, the server should be cautious
652  *                              and limit how much space is granted back to the
653  *                              client. Otherwise, the server should try hard to
654  *                              satisfy the client request.
655  *
656  * \retval                      amount of grant space allocated
657  */
658 static long ofd_grant_alloc(struct obd_export *exp, u64 curgrant,
659                             u64 want, u64 left, bool conservative)
660 {
661         struct obd_device               *obd = exp->exp_obd;
662         struct ofd_device               *ofd = ofd_exp(exp);
663         struct filter_export_data       *fed = &exp->exp_filter_data;
664         long                             grant_chunk;
665         u64                              grant;
666
667         ENTRY;
668
669         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
670                 RETURN(0);
671
672         if (want > 0x7fffffff) {
673                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
674                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
675                 RETURN(0);
676         }
677
678         /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
679          * size while the reality is different */
680         curgrant = ofd_grant_from_cli(exp, ofd, curgrant);
681         want = ofd_grant_from_cli(exp, ofd, want);
682         grant_chunk = ofd_grant_chunk(exp, ofd);
683
684         /* Grant some fraction of the client's requested grant space so that
685          * they are not always waiting for write credits (not all of it to
686          * avoid overgranting in face of multiple RPCs in flight).  This
687          * essentially will be able to control the OSC_MAX_RIF for a client.
688          *
689          * If we do have a large disparity between what the client thinks it
690          * has and what we think it has, don't grant very much and let the
691          * client consume its grant first.  Either it just has lots of RPCs
692          * in flight, or it was evicted and its grants will soon be used up. */
693         if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
694                    RETURN(0);
695
696         if (obd->obd_recovering)
697                 conservative = false;
698
699         if (conservative)
700                 /* don't grant more than 1/8th of the remaining free space in
701                  * one chunk */
702                 left >>= 3;
703         grant = min(want - curgrant, left);
704         /* round grant upt to the next block size */
705         grant = (grant + (1 << ofd->ofd_blockbits) - 1) &
706                 ~((1ULL << ofd->ofd_blockbits) - 1);
707
708         if (!grant)
709                 RETURN(0);
710
711         /* Limit to ofd_grant_chunk() if not reconnect/recovery */
712         if ((grant > grant_chunk) && conservative)
713                 grant = grant_chunk;
714
715         ofd->ofd_tot_granted += grant;
716         fed->fed_grant += grant;
717
718         if (fed->fed_grant < 0) {
719                 CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
720                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
721                        fed->fed_grant, want, curgrant);
722                 spin_unlock(&ofd->ofd_grant_lock);
723                 LBUG();
724         }
725
726         CDEBUG(D_CACHE,
727                "%s: cli %s/%p wants: "LPU64" current grant "LPU64
728                " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
729                exp, want, curgrant, grant);
730         CDEBUG(D_CACHE,
731                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
732                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
733                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
734                obd->obd_num_exports);
735
736         RETURN(ofd_grant_to_cli(exp, ofd, grant));
737 }
738
739 /**
740  * Handle grant space allocation on client connection & reconnection.
741  *
742  * A new non-readonly connection gets an initial grant allocation equals to
743  * ofd_grant_chunk() (i.e. twice the max BRW size in most of the cases).
744  * On reconnection, grant counters between client & OST are resynchronized
745  * and additional space might be granted back if possible.
746  *
747  * \param[in] env       LU environment provided by the caller
748  * \param[in] exp       client's export which is (re)connecting
749  * \param[in] want      how much grant space the client would like to get
750  * \param[in] new_conn  must set to true if this is a new connection and false
751  *                      for a reconnection
752  *
753  * \retval              amount of grant space currently owned by the client
754  */
755 long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
756                        u64 want, bool new_conn)
757 {
758         struct ofd_device               *ofd = ofd_exp(exp);
759         struct filter_export_data       *fed = &exp->exp_filter_data;
760         u64                              left = 0;
761         long                             grant;
762         int                              from_cache;
763         int                              force = 0; /* can use cached data */
764
765         /* don't grant space to client with read-only access */
766         if ((exp_connect_flags(exp) & OBD_CONNECT_RDONLY) ||
767             ofd_grant_prohibit(exp, ofd))
768                 return 0;
769
770 refresh:
771         ofd_grant_statfs(env, exp, force, &from_cache);
772
773         spin_lock(&ofd->ofd_grant_lock);
774
775         /* Grab free space from cached info and take out space already granted
776          * to clients as well as reserved space */
777         left = ofd_grant_space_left(exp);
778
779         /* get fresh statfs data if we are short in ungranted space */
780         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
781                 spin_unlock(&ofd->ofd_grant_lock);
782                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
783                 force = 1;
784                 goto refresh;
785         }
786
787         ofd_grant_alloc(exp,
788                         ofd_grant_to_cli(exp, ofd, (u64)fed->fed_grant),
789                         want, left, new_conn);
790
791         /* return to client its current grant */
792         grant = ofd_grant_to_cli(exp, ofd, (u64)fed->fed_grant);
793         ofd->ofd_tot_granted_clients++;
794
795         spin_unlock(&ofd->ofd_grant_lock);
796
797         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
798                LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
799                exp, grant, want, left);
800
801         return grant;
802 }
803
804 /**
805  * Release all grant space attached to a given export.
806  *
807  * Remove a client from the grant accounting totals.  We also remove
808  * the export from the obd device under the osfs and dev locks to ensure
809  * that the ofd_grant_sanity_check() calculations are always valid.
810  * The client should do something similar when it invalidates its import.
811  *
812  * \param[in] exp       client's export to remove from grant accounting
813  */
814 void ofd_grant_discard(struct obd_export *exp)
815 {
816         struct obd_device               *obd = exp->exp_obd;
817         struct ofd_device               *ofd = ofd_exp(exp);
818         struct filter_export_data       *fed = &exp->exp_filter_data;
819
820         spin_lock(&ofd->ofd_grant_lock);
821         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
822                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
823                  obd->obd_name, ofd->ofd_tot_granted,
824                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
825         ofd->ofd_tot_granted -= fed->fed_grant;
826         fed->fed_grant = 0;
827         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
828                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
829                  obd->obd_name, ofd->ofd_tot_pending,
830                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
831         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
832          * finishes */
833         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
834                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
835                  obd->obd_name, ofd->ofd_tot_dirty,
836                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
837         ofd->ofd_tot_dirty -= fed->fed_dirty;
838         fed->fed_dirty = 0;
839         spin_unlock(&ofd->ofd_grant_lock);
840 }
841
842 /**
843  * Process grant information from incoming bulk read request.
844  *
845  * Extract grant information packed in obdo structure (OBD_MD_FLGRANT set in
846  * o_valid). Bulk reads usually comes with grant announcements (number of dirty
847  * blocks, remaining amount of grant space, ...) and could also include a grant
848  * shrink request. Unlike bulk write, no additional grant space is returned on
849  * bulk read request.
850  *
851  * \param[in] env       is the lu environment provided by the caller
852  * \param[in] exp       is the export of the client which sent the request
853  * \param[in,out] oa    is the incoming obdo sent by the client
854  */
855 void ofd_grant_prepare_read(const struct lu_env *env,
856                             struct obd_export *exp, struct obdo *oa)
857 {
858         struct ofd_device       *ofd = ofd_exp(exp);
859         int                      do_shrink;
860         u64                      left = 0;
861
862         if (!oa)
863                 return;
864
865         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
866                 /* The read request does not contain any grant
867                  * information */
868                 return;
869
870         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
871             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
872                 /* To process grant shrink request, we need to know how much
873                  * available space remains on the backend filesystem.
874                  * Shrink requests are not so common, we always get fresh
875                  * statfs information. */
876                 ofd_grant_statfs(env, exp, 1, NULL);
877
878                 /* protect all grant counters */
879                 spin_lock(&ofd->ofd_grant_lock);
880
881                 /* Grab free space from cached statfs data and take out space
882                  * already granted to clients as well as reserved space */
883                 left = ofd_grant_space_left(exp);
884
885                 /* all set now to proceed with shrinking */
886                 do_shrink = 1;
887         } else {
888                 /* no grant shrinking request packed in the obdo and
889                  * since we don't grant space back on reads, no point
890                  * in running statfs, so just skip it and process
891                  * incoming grant data directly. */
892                 spin_lock(&ofd->ofd_grant_lock);
893                 do_shrink = 0;
894         }
895
896         /* extract incoming grant infomation provided by the client */
897         ofd_grant_incoming(env, exp, oa);
898
899         /* unlike writes, we don't return grants back on reads unless a grant
900          * shrink request was packed and we decided to turn it down. */
901         if (do_shrink)
902                 ofd_grant_shrink(exp, oa, left);
903         else
904                 oa->o_grant = 0;
905
906         spin_unlock(&ofd->ofd_grant_lock);
907 }
908
909 /**
910  * Process grant information from incoming bulk write request.
911  *
912  * This function extracts client's grant announcements from incoming bulk write
913  * request and attempts to allocate grant space for network buffers that need it
914  * (i.e. OBD_BRW_FROM_GRANT not set in rnb_fags).
915  * Network buffers which aren't granted the OBD_BRW_GRANTED flag should not
916  * proceed further and should fail with -ENOSPC.
917  * Whenever possible, additional grant space will be returned to the client
918  * in the bulk write reply.
919  * ofd_grant_prepare_write() must be called before writting any buffers to
920  * the backend storage. This function works in pair with ofd_grant_commit()
921  * which must be invoked once all buffers have been written to disk in order
922  * to release space from the pending grant counter.
923  *
924  * \param[in] env       LU environment provided by the caller
925  * \param[in] exp       export of the client which sent the request
926  * \param[in] oa        incoming obdo sent by the client
927  * \param[in] rnb       list of network buffers
928  * \param[in] niocount  number of network buffers in the list
929  */
930 void ofd_grant_prepare_write(const struct lu_env *env,
931                              struct obd_export *exp, struct obdo *oa,
932                              struct niobuf_remote *rnb, int niocount)
933 {
934         struct obd_device       *obd = exp->exp_obd;
935         struct ofd_device       *ofd = ofd_exp(exp);
936         u64                      left;
937         int                      from_cache;
938         int                      force = 0; /* can use cached data intially */
939         int                      rc;
940
941         ENTRY;
942
943 refresh:
944         /* get statfs information from OSD layer */
945         ofd_grant_statfs(env, exp, force, &from_cache);
946
947         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
948
949         /* Grab free space from cached statfs data and take out space already
950          * granted to clients as well as reserved space */
951         left = ofd_grant_space_left(exp);
952
953         /* Get fresh statfs data if we are short in ungranted space */
954         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
955                 spin_unlock(&ofd->ofd_grant_lock);
956                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
957                        obd->obd_name);
958                 force = 1;
959                 goto refresh;
960         }
961
962         /* When close to free space exhaustion, trigger a sync to force
963          * writeback cache to consume required space immediately and release as
964          * much space as possible. */
965         if (!obd->obd_recovering && force != 2 && left < OFD_GRANT_CHUNK) {
966                 bool from_grant = true;
967                 int  i;
968
969                 /* That said, it is worth running a sync only if some pages did
970                  * not consume grant space on the client and could thus fail
971                  * with ENOSPC later in ofd_grant_check() */
972                 for (i = 0; i < niocount; i++)
973                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
974                                 from_grant = false;
975
976                 if (!from_grant) {
977                         /* at least one network buffer requires acquiring grant
978                          * space on the server */
979                         spin_unlock(&ofd->ofd_grant_lock);
980                         /* discard errors, at least we tried ... */
981                         rc = dt_sync(env, ofd->ofd_osd);
982                         force = 2;
983                         goto refresh;
984                 }
985         }
986
987         /* extract incoming grant information provided by the client */
988         ofd_grant_incoming(env, exp, oa);
989
990         /* check limit */
991         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
992
993         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
994                 spin_unlock(&ofd->ofd_grant_lock);
995                 RETURN_EXIT;
996         }
997
998         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
999          * grant space. */
1000         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1001             (oa->o_flags & OBD_FL_SHRINK_GRANT))
1002                 ofd_grant_shrink(exp, oa, left);
1003         else
1004                 /* grant more space back to the client if possible */
1005                 oa->o_grant = ofd_grant_alloc(exp, oa->o_grant, oa->o_undirty,
1006                                               left, true);
1007         spin_unlock(&ofd->ofd_grant_lock);
1008 }
1009
1010 /**
1011  * Consume grant space reserved for object creation.
1012  *
1013  * Grant space is allocated to the local self export for object precreation.
1014  * This is required to prevent object precreation from consuming grant space
1015  * allocated to client nodes for the data writeback cache.
1016  * This function consumes enough space to create \a nr objects and allocates
1017  * more grant space to the self export for future precreation requests, if
1018  * possible.
1019  *
1020  * \param[in] env       LU environment provided by the caller
1021  * \param[in] exp       export holding the grant space for precreation (= self
1022  *                      export currently)
1023  * \param[in] nr        number of objects to be created
1024  *
1025  * \retval 0            for success
1026  * \retval -ENOSPC      on failure
1027  */
1028 int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
1029 {
1030         struct ofd_thread_info          *info = ofd_info(env);
1031         struct ofd_device               *ofd = ofd_exp(exp);
1032         struct filter_export_data       *fed = &exp->exp_filter_data;
1033         u64                              left = 0;
1034         unsigned long                    wanted;
1035         ENTRY;
1036
1037         info->fti_used = 0;
1038
1039         if (exp->exp_obd->obd_recovering ||
1040             ofd->ofd_dt_conf.ddp_inodespace == 0)
1041                 /* don't enforce grant during recovery */
1042                 RETURN(0);
1043
1044         /* Update statfs data if required */
1045         ofd_grant_statfs(env, exp, 1, NULL);
1046
1047         /* protect all grant counters */
1048         spin_lock(&ofd->ofd_grant_lock);
1049
1050         /* fail precreate request if there is not enough blocks available for
1051          * writing */
1052         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
1053             (ofd->ofd_osfs.os_blocks >> 10)) {
1054                 spin_unlock(&ofd->ofd_grant_lock);
1055                 CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
1056                        ofd_name(ofd),
1057                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
1058                 RETURN(-ENOSPC);
1059         }
1060
1061         /* Grab free space from cached statfs data and take out space
1062          * already granted to clients as well as reserved space */
1063         left = ofd_grant_space_left(exp);
1064
1065         /* compute how much space is required to handle the precreation
1066          * request */
1067         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1068         if (wanted > fed->fed_grant + left) {
1069                 /* that's beyond what remains, adjust the number of objects that
1070                  * can be safely precreated */
1071                 wanted = fed->fed_grant + left;
1072                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
1073                 if (*nr == 0) {
1074                         /* we really have no space any more for precreation,
1075                          * fail the precreate request with ENOSPC */
1076                         spin_unlock(&ofd->ofd_grant_lock);
1077                         RETURN(-ENOSPC);
1078                 }
1079                 /* compute space needed for the new number of creations */
1080                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1081         }
1082         LASSERT(wanted <= fed->fed_grant + left);
1083
1084         if (wanted <= fed->fed_grant) {
1085                 /* we've enough grant space to handle this precreate request */
1086                 fed->fed_grant -= wanted;
1087         } else {
1088                 /* we need to take some space from the ungranted pool */
1089                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
1090                 left -= wanted - fed->fed_grant;
1091                 fed->fed_grant = 0;
1092         }
1093         info->fti_used = wanted;
1094         fed->fed_pending += info->fti_used;
1095         ofd->ofd_tot_pending += info->fti_used;
1096
1097         /* grant more space for precreate purpose if possible. */
1098         wanted = OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
1099         if (wanted > fed->fed_grant) {
1100                 /* always try to book enough space to handle a large precreate
1101                  * request */
1102                 wanted -= fed->fed_grant;
1103                 ofd_grant_alloc(exp, fed->fed_grant, wanted, left, false);
1104         }
1105         spin_unlock(&ofd->ofd_grant_lock);
1106         RETURN(0);
1107 }
1108
1109 /**
1110  * Release grant space added to the pending counter by ofd_grant_prepare_write()
1111  *
1112  * Update pending grant counter once buffers have been written to the disk.
1113  *
1114  * \param[in] env       LU environment provided by the caller
1115  * \param[in] exp       export of the client which sent the request
1116  * \param[in] rc        return code of pre-commit operations
1117  */
1118 void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp,
1119                       int rc)
1120 {
1121         struct ofd_device       *ofd  = ofd_exp(exp);
1122         struct ofd_thread_info  *info = ofd_info(env);
1123         unsigned long            pending;
1124
1125         ENTRY;
1126
1127         /* get space accounted in tot_pending for the I/O, set in
1128          * ofd_grant_check() */
1129         pending = info->fti_used;
1130         if (pending == 0)
1131                 RETURN_EXIT;
1132
1133         spin_lock(&ofd->ofd_grant_lock);
1134         /* Don't update statfs data for errors raised before commit (e.g.
1135          * bulk transfer failed, ...) since we know those writes have not been
1136          * processed. For other errors hit during commit, we cannot really tell
1137          * whether or not something was written, so we update statfs data.
1138          * In any case, this should not be fatal since we always get fresh
1139          * statfs data before failing a request with ENOSPC */
1140         if (rc == 0) {
1141                 spin_lock(&ofd->ofd_osfs_lock);
1142                 /* Take pending out of cached statfs data */
1143                 ofd->ofd_osfs.os_bavail -= min_t(u64,
1144                                                  ofd->ofd_osfs.os_bavail,
1145                                                  pending >> ofd->ofd_blockbits);
1146                 if (ofd->ofd_statfs_inflight)
1147                         /* someone is running statfs and want to be notified of
1148                          * writes happening meanwhile */
1149                         ofd->ofd_osfs_inflight += pending;
1150                 spin_unlock(&ofd->ofd_osfs_lock);
1151         }
1152
1153         if (exp->exp_filter_data.fed_pending < pending) {
1154                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1155                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1156                        exp->exp_filter_data.fed_pending, pending);
1157                 spin_unlock(&ofd->ofd_grant_lock);
1158                 LBUG();
1159         }
1160         exp->exp_filter_data.fed_pending -= pending;
1161
1162         if (ofd->ofd_tot_granted < pending) {
1163                  CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
1164                         "\n", exp->exp_obd->obd_name,
1165                         exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
1166                         pending);
1167                 spin_unlock(&ofd->ofd_grant_lock);
1168                 LBUG();
1169         }
1170         ofd->ofd_tot_granted -= pending;
1171
1172         if (ofd->ofd_tot_pending < pending) {
1173                  CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
1174                         "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1175                         exp, ofd->ofd_tot_pending, pending);
1176                 spin_unlock(&ofd->ofd_grant_lock);
1177                 LBUG();
1178         }
1179         ofd->ofd_tot_pending -= pending;
1180         spin_unlock(&ofd->ofd_grant_lock);
1181         EXIT;
1182 }