Whamcloud - gitweb
5ef22b5e5002c0bb14d85e62fe36ab28f412e394
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 /*
33  * lustre/ofd/ofd_grant.c
34  *
35  * This file provides code related to grant space management on Object Storage
36  * Targets (OSTs). Grant is a mechanism used by client nodes to reserve disk
37  * space on OSTs for the data writeback cache. The Lustre client is thus assured
38  * that enough space will be available when flushing dirty pages asynchronously.
39  * Each client node is granted an initial amount of reserved space at connect
40  * time and gets additional space back from OST in bulk write reply.
41  *
42  * This file handles the core logic for:
43  * - grant allocation strategy
44  * - maintaining per-client as well as global grant space accounting
45  * - processing grant information packed in incoming requests
46  * - allocating server-side grant space for synchronous write RPCs which did not
47  *   consume grant on the client side (OBD_BRW_FROM_GRANT flag not set). If not
48  *   enough space is available, such RPCs fail with ENOSPC
49  *
50  * Author: Johann Lombardi <johann.lombardi@intel.com>
51  */
52
53 #define DEBUG_SUBSYSTEM S_FILTER
54
55 #include "ofd_internal.h"
56
57 /* At least enough to send a couple of 1MB RPCs, even if not max sized */
58 #define OFD_GRANT_CHUNK                 (2ULL * DT_MAX_BRW_SIZE)
59
60 /* Clients typically hold 2x their max_rpcs_in_flight of grant space */
61 #define OFD_GRANT_SHRINK_LIMIT(exp)     (2ULL * 8 * exp_max_brw_size(exp))
62
63 static inline u64 ofd_grant_from_cli(struct obd_export *exp,
64                                      struct ofd_device *ofd, u64 val)
65 {
66         if (ofd_grant_compat(exp, ofd))
67                 /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
68                  * consume 4KB of grant per block, we should thus inflate
69                  * the grant counters to reflect what was actually consumed */
70                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
71         return val;
72 }
73
74 static inline u64 ofd_grant_to_cli(struct obd_export *exp,
75                                    struct ofd_device *ofd, u64 val)
76 {
77         if (ofd_grant_compat(exp, ofd))
78                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
79         return val;
80 }
81
82 static inline u64 ofd_grant_chunk(struct obd_export *exp,
83                                   struct ofd_device *ofd)
84 {
85         if (ofd_obd(ofd)->obd_self_export == exp)
86                 /* Grant enough space to handle a big precreate request */
87                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
88
89         if (ofd_grant_compat(exp, ofd))
90                 /* Try to grant enough space to send a full-size RPC */
91                 return exp_max_brw_size(exp) <<
92                        (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
93
94         /* Try to return enough to send two full RPCs, if needed */
95         return exp_max_brw_size(exp) * 2;
96 }
97
98 /**
99  * Perform extra sanity checks for grant accounting.
100  *
101  * This function scans the export list, sanity checks per-export grant counters
102  * and verifies accuracy of global grant accounting. If an inconsistency is
103  * found, a CERROR is printed with the function name \func that was passed as
104  * argument. LBUG is only called in case of serious counter corruption (i.e.
105  * value larger than the device size).
106  * Those sanity checks can be pretty expensive and are disabled if the OBD
107  * device has more than 100 connected exports.
108  *
109  * \param[in] obd       OBD device for which grant accounting should be
110  *                      verified
111  * \param[in] func      caller's function name
112  */
113 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
114 {
115         struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
116         struct obd_export *exp;
117         u64                maxsize;
118         u64                tot_dirty = 0;
119         u64                tot_pending = 0;
120         u64                tot_granted = 0;
121         u64                fo_tot_granted;
122         u64                fo_tot_pending;
123         u64                fo_tot_dirty;
124
125         if (list_empty(&obd->obd_exports))
126                 return;
127
128         /* We don't want to do this for large machines that do lots of
129          * mounts or unmounts.  It burns... */
130         if (obd->obd_num_exports > 100)
131                 return;
132
133         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
134
135         spin_lock(&obd->obd_dev_lock);
136         spin_lock(&ofd->ofd_grant_lock);
137         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
138                 struct filter_export_data       *fed;
139                 int                              error = 0;
140
141                 fed = &exp->exp_filter_data;
142
143                 if (obd->obd_self_export == exp)
144                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
145                                "%ld\n", obd->obd_name, fed->fed_grant,
146                                fed->fed_pending, fed->fed_dirty);
147
148                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
149                     fed->fed_dirty < 0)
150                         error = 1;
151                 if (fed->fed_grant + fed->fed_pending > maxsize) {
152                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
153                                " > maxsize("LPU64")\n", obd->obd_name,
154                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
155                                fed->fed_pending, maxsize);
156                         spin_unlock(&obd->obd_dev_lock);
157                         spin_unlock(&ofd->ofd_grant_lock);
158                         LBUG();
159                 }
160                 if (fed->fed_dirty > maxsize) {
161                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
162                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
163                                exp, fed->fed_dirty, maxsize);
164                         spin_unlock(&obd->obd_dev_lock);
165                         spin_unlock(&ofd->ofd_grant_lock);
166                         LBUG();
167                 }
168                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
169                              "%ld pend %ld grant %ld\n", obd->obd_name,
170                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
171                              fed->fed_pending, fed->fed_grant);
172                 tot_granted += fed->fed_grant + fed->fed_pending;
173                 tot_pending += fed->fed_pending;
174                 tot_dirty += fed->fed_dirty;
175         }
176
177         /* exports about to be unlinked should also be taken into account since
178          * they might still hold pending grant space to be released at
179          * commit time */
180         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain) {
181                 struct filter_export_data       *fed;
182                 int                              error = 0;
183
184                 fed = &exp->exp_filter_data;
185
186                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
187                     fed->fed_dirty < 0)
188                         error = 1;
189                 if (fed->fed_grant + fed->fed_pending > maxsize) {
190                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
191                                " > maxsize("LPU64")\n", obd->obd_name,
192                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
193                                fed->fed_pending, maxsize);
194                         spin_unlock(&obd->obd_dev_lock);
195                         spin_unlock(&ofd->ofd_grant_lock);
196                         LBUG();
197                 }
198                 if (fed->fed_dirty > maxsize) {
199                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
200                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
201                                exp, fed->fed_dirty, maxsize);
202                         spin_unlock(&obd->obd_dev_lock);
203                         spin_unlock(&ofd->ofd_grant_lock);
204                         LBUG();
205                 }
206                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
207                              "%ld pend %ld grant %ld\n", obd->obd_name,
208                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
209                              fed->fed_pending, fed->fed_grant);
210                 tot_granted += fed->fed_grant + fed->fed_pending;
211                 tot_pending += fed->fed_pending;
212                 tot_dirty += fed->fed_dirty;
213         }
214
215         spin_unlock(&obd->obd_dev_lock);
216         fo_tot_granted = ofd->ofd_tot_granted;
217         fo_tot_pending = ofd->ofd_tot_pending;
218         fo_tot_dirty = ofd->ofd_tot_dirty;
219
220         if (tot_granted != fo_tot_granted)
221                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
222                        func, tot_granted, fo_tot_granted);
223         if (tot_pending != fo_tot_pending)
224                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
225                        func, tot_pending, fo_tot_pending);
226         if (tot_dirty != fo_tot_dirty)
227                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
228                        func, tot_dirty, fo_tot_dirty);
229         if (tot_pending > tot_granted)
230                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
231                        func, tot_pending, tot_granted);
232         if (tot_granted > maxsize)
233                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
234                        func, tot_granted, maxsize);
235         if (tot_dirty > maxsize)
236                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
237                        func, tot_dirty, maxsize);
238         spin_unlock(&ofd->ofd_grant_lock);
239 }
240
241 /**
242  * Update cached statfs information from the OSD layer
243  *
244  * Refresh statfs information cached in ofd::ofd_osfs if the cache is older
245  * than 1s or if force is set. The OSD layer is in charge of estimating data &
246  * metadata overhead.
247  * This function can sleep so it should not be called with any spinlock held.
248  *
249  * \param[in] env               LU environment passed by the caller
250  * \param[in] exp               export used to print client info in debug
251  *                              messages
252  * \param[in] force             force a refresh of statfs information
253  * \param[out] from_cache       returns whether the statfs information are
254  *                              taken from cache
255  */
256 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
257                              int force, int *from_cache)
258 {
259         struct obd_device       *obd = exp->exp_obd;
260         struct ofd_device       *ofd = ofd_exp(exp);
261         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
262         __u64                    max_age;
263         int                      rc;
264
265         if (force)
266                 max_age = 0; /* get fresh statfs data */
267         else
268                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
269
270         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
271         if (unlikely(rc)) {
272                 if (from_cache)
273                         *from_cache = 0;
274                 return;
275         }
276
277         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
278                obd->obd_name, exp->exp_client_uuid.uuid, exp,
279                osfs->os_bfree << ofd->ofd_blockbits,
280                osfs->os_bavail << ofd->ofd_blockbits);
281 }
282
283 /**
284  * Figure out how much space is available on the backend filesystem after
285  * removing grant space already booked by clients.
286  *
287  * This is done by accessing cached statfs data previously populated by
288  * ofd_grant_statfs(), from which we withdraw the space already granted to
289  * clients and the reserved space.
290  * Caller must hold ofd_grant_lock spinlock.
291  *
292  * \param[in] exp       export associated with the device for which the amount
293  *                      of available space is requested
294  * \retval              amount of non-allocated space, in bytes
295  */
296 static u64 ofd_grant_space_left(struct obd_export *exp)
297 {
298         struct obd_device *obd = exp->exp_obd;
299         struct ofd_device *ofd = ofd_exp(exp);
300         u64                tot_granted;
301         u64                left;
302         u64                avail;
303         u64                unstable;
304
305         ENTRY;
306         assert_spin_locked(&ofd->ofd_grant_lock);
307
308         spin_lock(&ofd->ofd_osfs_lock);
309         /* get available space from cached statfs data */
310         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
311         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
312         spin_unlock(&ofd->ofd_osfs_lock);
313
314         tot_granted = ofd->ofd_tot_granted;
315
316         if (left < tot_granted) {
317                 int mask = (left + unstable <
318                             tot_granted - ofd->ofd_tot_pending) ?
319                             D_ERROR : D_CACHE;
320
321                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
322                              LPU64" unstable "LPU64" pending "LPU64" "
323                              "dirty "LPU64"\n",
324                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
325                              left, tot_granted, unstable,
326                              ofd->ofd_tot_pending, ofd->ofd_tot_dirty);
327                 RETURN(0);
328         }
329
330         avail = left;
331         /* Withdraw space already granted to clients */
332         left -= tot_granted;
333
334         /* If the left space is below the grant threshold x available space,
335          * stop granting space to clients.
336          * The purpose of this threshold is to keep some error margin on the
337          * overhead estimate made by the OSD layer. If we grant all the free
338          * space, we have no way (grant space cannot be revoked yet) to
339          * adjust if the write overhead has been underestimated. */
340         left -= min_t(u64, left, ofd_grant_reserved(ofd, avail));
341
342         /* Align left on block size */
343         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
344
345         CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
346                LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
347                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
348                tot_granted, ofd->ofd_tot_pending);
349
350         RETURN(left);
351 }
352
353 /**
354  * Process grant information from obdo structure packed in incoming BRW
355  *
356  * Grab the dirty and seen grant announcements from the incoming obdo.
357  * We will later calculate the client's new grant and return it.
358  * Caller must hold ofd_grant_lock spinlock.
359  *
360  * \param[in] env       LU environment supplying osfs storage
361  * \param[in] exp       export for which we received the request
362  * \param[in,out] oa    incoming obdo sent by the client
363  *
364  */
365 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
366                                struct obdo *oa)
367 {
368         struct filter_export_data       *fed;
369         struct ofd_device               *ofd = ofd_exp(exp);
370         struct obd_device               *obd = exp->exp_obd;
371         long                             dirty;
372         long                             dropped;
373         long                             grant_chunk;
374         ENTRY;
375
376         assert_spin_locked(&ofd->ofd_grant_lock);
377
378         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
379                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
380                 oa->o_valid &= ~OBD_MD_FLGRANT;
381                 RETURN_EXIT;
382         }
383
384         fed = &exp->exp_filter_data;
385
386         /* Add some margin, since there is a small race if other RPCs arrive
387          * out-or-order and have already consumed some grant.  We want to
388          * leave this here in case there is a large error in accounting. */
389         CDEBUG(D_CACHE,
390                "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
391                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
392                oa->o_dropped, fed->fed_grant);
393
394         if ((long long)oa->o_dirty < 0)
395                 oa->o_dirty = 0;
396
397         dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
398         dropped     = ofd_grant_from_cli(exp, ofd, (u64)oa->o_dropped);
399         grant_chunk = ofd_grant_chunk(exp, ofd);
400
401         /* Update our accounting now so that statfs takes it into account.
402          * Note that fed_dirty is only approximate and can become incorrect
403          * if RPCs arrive out-of-order.  No important calculations depend
404          * on fed_dirty however, but we must check sanity to not assert. */
405         if (dirty > fed->fed_grant + 4 * grant_chunk)
406                 dirty = fed->fed_grant + 4 * grant_chunk;
407         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
408         if (fed->fed_grant < dropped) {
409                 CDEBUG(D_CACHE,
410                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
411                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
412                        fed->fed_grant);
413                 dropped = 0;
414         }
415         if (ofd->ofd_tot_granted < dropped) {
416                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
417                        "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
418                        dropped, ofd->ofd_tot_granted);
419                 dropped = 0;
420         }
421         ofd->ofd_tot_granted -= dropped;
422         fed->fed_grant -= dropped;
423         fed->fed_dirty = dirty;
424
425         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
426                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
427                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
428                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
429                 spin_unlock(&ofd->ofd_grant_lock);
430                 LBUG();
431         }
432         EXIT;
433 }
434
435 /**
436  * Grant shrink request handler.
437  *
438  * Client nodes can explicitly release grant space (i.e. process called grant
439  * shrinking). This function proceeds with the shrink request when there is
440  * less ungranted space remaining than the amount all of the connected clients
441  * would consume if they used their full grant.
442  * Caller must hold ofd_grant_lock spinlock.
443  *
444  * \param[in] exp               export releasing grant space
445  * \param[in,out] oa            incoming obdo sent by the client
446  * \param[in] left_space        remaining free space with space already granted
447  *                              taken out
448  */
449 static void ofd_grant_shrink(struct obd_export *exp, struct obdo *oa,
450                              u64 left_space)
451 {
452         struct filter_export_data       *fed;
453         struct ofd_device               *ofd = ofd_exp(exp);
454         struct obd_device               *obd = exp->exp_obd;
455         long                             grant_shrink;
456
457         assert_spin_locked(&ofd->ofd_grant_lock);
458         LASSERT(exp);
459         if (left_space >= ofd->ofd_tot_granted_clients *
460                           OFD_GRANT_SHRINK_LIMIT(exp))
461                 return;
462
463         grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
464
465         fed = &exp->exp_filter_data;
466         fed->fed_grant       -= grant_shrink;
467         ofd->ofd_tot_granted -= grant_shrink;
468
469         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
470                LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
471                exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
472
473         /* client has just released some grant, don't grant any space back */
474         oa->o_grant = 0;
475 }
476
477 /**
478  * Calculate how much space is required to write a given network buffer
479  *
480  * This function takes block alignment into account to estimate how much on-disk
481  * space will be required to successfully write the whole niobuf.
482  * Estimated space is inflated if the export does not support
483  * OBD_CONNECT_GRANT_PARAM and if the backend filesystem has a block size
484  * larger than the minimal supported page size (i.e. 4KB).
485  *
486  * \param[in] exp       export associated which the write request
487  * \param[in] ofd       ofd device handling the request
488  * \param[in] rnb       network buffer to estimate size of
489  *
490  * \retval              space (in bytes) that will be consumed to write the
491  *                      network buffer
492  */
493 static inline u64 ofd_grant_rnb_size(struct obd_export *exp,
494                                      struct ofd_device *ofd,
495                                      struct niobuf_remote *rnb)
496 {
497         u64 blocksize;
498         u64 bytes;
499         u64 end;
500
501         if (exp && ofd_grant_compat(exp, ofd))
502                 blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
503         else
504                 blocksize = 1ULL << ofd->ofd_blockbits;
505
506         /* The network buffer might span several blocks, align it on block
507          * boundaries */
508         bytes  = rnb->rnb_offset & (blocksize - 1);
509         bytes += rnb->rnb_len;
510         end    = bytes & (blocksize - 1);
511         if (end)
512                 bytes += blocksize - end;
513         if (exp)
514                 /* Apply per-export pecularities if one is given */
515                 bytes = ofd_grant_from_cli(exp, ofd, bytes);
516         return bytes;
517 }
518
519 /**
520  * Validate grant accounting for each incoming remote network buffer.
521  *
522  * When clients have dirtied as much space as they've been granted they
523  * fall through to sync writes. These sync writes haven't been expressed
524  * in grants and need to error with ENOSPC when there isn't room in the
525  * filesystem for them after grants are taken into account. However,
526  * writeback of the dirty data that was already granted space can write
527  * right on through.
528  * The OBD_BRW_GRANTED flag will be set in the rnb_flags of each network
529  * buffer which has been granted enough space to proceed. Buffers without
530  * this flag will fail to be written with -ENOSPC (see ofd_preprw_write().
531  * Caller must hold ofd_grant_lock spinlock.
532  *
533  * \param[in] env       LU environment passed by the caller
534  * \param[in] exp       export identifying the client which sent the RPC
535  * \param[in] oa        incoming obdo in which we should return the pack the
536  *                      additional grant
537  * \param[in,out] rnb   the list of network buffers
538  * \param[in] niocount  the number of network buffers in the list
539  * \param[in] left      the remaining free space with space already granted
540  *                      taken out
541  */
542 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
543                             struct obdo *oa, struct niobuf_remote *rnb,
544                             int niocount, u64 *left)
545 {
546         struct filter_export_data       *fed = &exp->exp_filter_data;
547         struct obd_device               *obd = exp->exp_obd;
548         struct ofd_device               *ofd = ofd_exp(exp);
549         unsigned long                    ungranted = 0;
550         unsigned long                    granted = 0;
551         int                              i;
552         int                              resend = 0;
553         struct ofd_thread_info          *info = ofd_info(env);
554
555         ENTRY;
556
557         assert_spin_locked(&ofd->ofd_grant_lock);
558
559         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
560             (oa->o_flags & OBD_FL_RECOV_RESEND)) {
561                 resend = 1;
562                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
563                                 "accounting\n");
564         }
565
566         for (i = 0; i < niocount; i++) {
567                 int bytes;
568
569                 if (obd->obd_recovering) {
570                         /* Replaying write. Grant info have been processed
571                          * already so no need to do any enforcement here.
572                          * It is worth noting that only bulk writes with all
573                          * rnbs having OBD_BRW_FROM_GRANT can be replayed.
574                          * If one page hasn't OBD_BRW_FROM_GRANT set, then
575                          * the whole bulk is written synchronously */
576                         if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
577                                  rnb[i].rnb_flags |= OBD_BRW_GRANTED;
578                                  continue;
579                         } else {
580                                 CERROR("%s: cli %s is replaying OST_WRITE "
581                                        "while one rnb hasn't OBD_BRW_FROM_GRANT"
582                                        " set (0x%x)\n", exp->exp_obd->obd_name,
583                                         exp->exp_client_uuid.uuid,
584                                         rnb[i].rnb_flags);
585
586                         }
587                 } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
588                            (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
589                         if (resend) {
590                                 /* This is a recoverable resend so grant
591                                  * information have already been processed */
592                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
593                                 continue;
594                         }
595
596                         /* inflate consumed space if needed */
597                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
598                         if (fed->fed_grant < granted + bytes) {
599                                 CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
600                                        "GRANT, real grant %lu idx %d\n",
601                                        exp->exp_obd->obd_name,
602                                        exp->exp_client_uuid.uuid, exp,
603                                        granted, bytes, fed->fed_grant, i);
604                         } else {
605                                 granted += bytes;
606                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
607                                 continue;
608                         }
609                 }
610
611                 /* Consume grant space on the server.
612                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
613                  * so that the required grant space isn't inflated. This is
614                  * done on purpose since the server can deal with large block
615                  * size, unlike some clients */
616                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
617                 if (*left > ungranted + bytes) {
618                         /* if enough space, pretend it was granted */
619                         ungranted += bytes;
620                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
621                         continue;
622                 }
623
624                 /* We can't check for already-mapped blocks here (make sense
625                  * when backend filesystem does not use COW) as it requires
626                  * dropping the grant lock.
627                  * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
628                  * to go through and verify if all of the blocks not marked
629                  *  BRW_GRANTED are already mapped and we can ignore this error.
630                  */
631                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
632                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
633                                 exp->exp_obd->obd_name,
634                                 exp->exp_client_uuid.uuid, exp, i, bytes);
635         }
636
637         /* record in o_grant_used the actual space reserved for the I/O, will be
638          * used later in ofd_grant_commmit() */
639         oa->o_grant_used = granted + ungranted;
640
641         /* Now substract what the clients has used already.  We don't subtract
642          * this from the tot_granted yet, so that other client's can't grab
643          * that space before we have actually allocated our blocks. That
644          * happens in ofd_grant_commit() after the writes are done. */
645         info->fti_used = granted + ungranted;
646         *left -= ungranted;
647         fed->fed_grant -= granted;
648         fed->fed_pending += oa->o_grant_used;
649         ofd->ofd_tot_granted += ungranted;
650         ofd->ofd_tot_pending += oa->o_grant_used;
651
652         CDEBUG(D_CACHE,
653                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
654                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
655                granted, ungranted, fed->fed_grant, fed->fed_dirty);
656
657         if (obd->obd_recovering)
658                 /* don't update dirty accounting during recovery */
659                 RETURN_EXIT;
660
661         if (fed->fed_dirty < granted) {
662                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
663                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
664                        granted, fed->fed_dirty);
665                 granted = fed->fed_dirty;
666         }
667         ofd->ofd_tot_dirty -= granted;
668         fed->fed_dirty -= granted;
669
670         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
671                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
672                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
673                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
674                 spin_unlock(&ofd->ofd_grant_lock);
675                 LBUG();
676         }
677         EXIT;
678 }
679
680 /**
681  * Allocate additional grant space to a client
682  *
683  * Calculate how much grant space to return to client, based on how much space
684  * is currently free and how much of that is already granted.
685  * Caller must hold ofd_grant_lock spinlock.
686  *
687  * \param[in] exp               export of the client which sent the request
688  * \param[in] curgrant          current grant claimed by the client
689  * \param[in] want              how much grant space the client would like to
690  *                              have
691  * \param[in] left              remaining free space with granted space taken
692  *                              out
693  * \param[in] conservative      if set to true, the server should be cautious
694  *                              and limit how much space is granted back to the
695  *                              client. Otherwise, the server should try hard to
696  *                              satisfy the client request.
697  *
698  * \retval                      amount of grant space allocated
699  */
700 static long ofd_grant_alloc(struct obd_export *exp, u64 curgrant,
701                             u64 want, u64 left, bool conservative)
702 {
703         struct obd_device               *obd = exp->exp_obd;
704         struct ofd_device               *ofd = ofd_exp(exp);
705         struct filter_export_data       *fed = &exp->exp_filter_data;
706         long                             grant_chunk;
707         u64                              grant;
708
709         ENTRY;
710
711         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
712                 RETURN(0);
713
714         if (want > 0x7fffffff) {
715                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
716                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
717                 RETURN(0);
718         }
719
720         /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
721          * size while the reality is different */
722         curgrant = ofd_grant_from_cli(exp, ofd, curgrant);
723         want = ofd_grant_from_cli(exp, ofd, want);
724         grant_chunk = ofd_grant_chunk(exp, ofd);
725
726         /* Grant some fraction of the client's requested grant space so that
727          * they are not always waiting for write credits (not all of it to
728          * avoid overgranting in face of multiple RPCs in flight).  This
729          * essentially will be able to control the OSC_MAX_RIF for a client.
730          *
731          * If we do have a large disparity between what the client thinks it
732          * has and what we think it has, don't grant very much and let the
733          * client consume its grant first.  Either it just has lots of RPCs
734          * in flight, or it was evicted and its grants will soon be used up. */
735         if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
736                    RETURN(0);
737
738         if (obd->obd_recovering)
739                 conservative = false;
740
741         if (conservative)
742                 /* don't grant more than 1/8th of the remaining free space in
743                  * one chunk */
744                 left >>= 3;
745         grant = min(want - curgrant, left);
746         /* round grant upt to the next block size */
747         grant = (grant + (1 << ofd->ofd_blockbits) - 1) &
748                 ~((1ULL << ofd->ofd_blockbits) - 1);
749
750         if (!grant)
751                 RETURN(0);
752
753         /* Limit to ofd_grant_chunk() if not reconnect/recovery */
754         if ((grant > grant_chunk) && conservative)
755                 grant = grant_chunk;
756
757         ofd->ofd_tot_granted += grant;
758         fed->fed_grant += grant;
759
760         if (fed->fed_grant < 0) {
761                 CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
762                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
763                        fed->fed_grant, want, curgrant);
764                 spin_unlock(&ofd->ofd_grant_lock);
765                 LBUG();
766         }
767
768         CDEBUG(D_CACHE,
769                "%s: cli %s/%p wants: "LPU64" current grant "LPU64
770                " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
771                exp, want, curgrant, grant);
772         CDEBUG(D_CACHE,
773                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
774                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
775                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
776                obd->obd_num_exports);
777
778         RETURN(ofd_grant_to_cli(exp, ofd, grant));
779 }
780
781 /**
782  * Handle grant space allocation on client connection & reconnection.
783  *
784  * A new non-readonly connection gets an initial grant allocation equals to
785  * ofd_grant_chunk() (i.e. twice the max BRW size in most of the cases).
786  * On reconnection, grant counters between client & OST are resynchronized
787  * and additional space might be granted back if possible.
788  *
789  * \param[in] env       LU environment provided by the caller
790  * \param[in] exp       client's export which is (re)connecting
791  * \param[in] want      how much grant space the client would like to get
792  * \param[in] new_conn  must set to true if this is a new connection and false
793  *                      for a reconnection
794  *
795  * \retval              amount of grant space currently owned by the client
796  */
797 long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
798                        u64 want, bool new_conn)
799 {
800         struct ofd_device               *ofd = ofd_exp(exp);
801         struct filter_export_data       *fed = &exp->exp_filter_data;
802         u64                              left = 0;
803         long                             grant;
804         int                              from_cache;
805         int                              force = 0; /* can use cached data */
806
807         /* don't grant space to client with read-only access */
808         if ((exp_connect_flags(exp) & OBD_CONNECT_RDONLY) ||
809             ofd_grant_prohibit(exp, ofd))
810                 return 0;
811
812 refresh:
813         ofd_grant_statfs(env, exp, force, &from_cache);
814
815         spin_lock(&ofd->ofd_grant_lock);
816
817         /* Grab free space from cached info and take out space already granted
818          * to clients as well as reserved space */
819         left = ofd_grant_space_left(exp);
820
821         /* get fresh statfs data if we are short in ungranted space */
822         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
823                 spin_unlock(&ofd->ofd_grant_lock);
824                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
825                 force = 1;
826                 goto refresh;
827         }
828
829         ofd_grant_alloc(exp,
830                         ofd_grant_to_cli(exp, ofd, (u64)fed->fed_grant),
831                         want, left, new_conn);
832
833         /* return to client its current grant */
834         grant = ofd_grant_to_cli(exp, ofd, (u64)fed->fed_grant);
835         ofd->ofd_tot_granted_clients++;
836
837         spin_unlock(&ofd->ofd_grant_lock);
838
839         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
840                LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
841                exp, grant, want, left);
842
843         return grant;
844 }
845
846 /**
847  * Release all grant space attached to a given export.
848  *
849  * Remove a client from the grant accounting totals.  We also remove
850  * the export from the obd device under the osfs and dev locks to ensure
851  * that the ofd_grant_sanity_check() calculations are always valid.
852  * The client should do something similar when it invalidates its import.
853  *
854  * \param[in] exp       client's export to remove from grant accounting
855  */
856 void ofd_grant_discard(struct obd_export *exp)
857 {
858         struct obd_device               *obd = exp->exp_obd;
859         struct ofd_device               *ofd = ofd_exp(exp);
860         struct filter_export_data       *fed = &exp->exp_filter_data;
861
862         spin_lock(&ofd->ofd_grant_lock);
863         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
864                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
865                  obd->obd_name, ofd->ofd_tot_granted,
866                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
867         ofd->ofd_tot_granted -= fed->fed_grant;
868         fed->fed_grant = 0;
869         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
870                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
871                  obd->obd_name, ofd->ofd_tot_pending,
872                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
873         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
874          * commmits */
875         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
876                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
877                  obd->obd_name, ofd->ofd_tot_dirty,
878                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
879         ofd->ofd_tot_dirty -= fed->fed_dirty;
880         fed->fed_dirty = 0;
881         spin_unlock(&ofd->ofd_grant_lock);
882 }
883
884 /**
885  * Process grant information from incoming bulk read request.
886  *
887  * Extract grant information packed in obdo structure (OBD_MD_FLGRANT set in
888  * o_valid). Bulk reads usually comes with grant announcements (number of dirty
889  * blocks, remaining amount of grant space, ...) and could also include a grant
890  * shrink request. Unlike bulk write, no additional grant space is returned on
891  * bulk read request.
892  *
893  * \param[in] env       is the lu environment provided by the caller
894  * \param[in] exp       is the export of the client which sent the request
895  * \param[in,out] oa    is the incoming obdo sent by the client
896  */
897 void ofd_grant_prepare_read(const struct lu_env *env,
898                             struct obd_export *exp, struct obdo *oa)
899 {
900         struct ofd_device       *ofd = ofd_exp(exp);
901         int                      do_shrink;
902         u64                      left = 0;
903
904         if (!oa)
905                 return;
906
907         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
908                 /* The read request does not contain any grant
909                  * information */
910                 return;
911
912         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
913             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
914                 /* To process grant shrink request, we need to know how much
915                  * available space remains on the backend filesystem.
916                  * Shrink requests are not so common, we always get fresh
917                  * statfs information. */
918                 ofd_grant_statfs(env, exp, 1, NULL);
919
920                 /* protect all grant counters */
921                 spin_lock(&ofd->ofd_grant_lock);
922
923                 /* Grab free space from cached statfs data and take out space
924                  * already granted to clients as well as reserved space */
925                 left = ofd_grant_space_left(exp);
926
927                 /* all set now to proceed with shrinking */
928                 do_shrink = 1;
929         } else {
930                 /* no grant shrinking request packed in the obdo and
931                  * since we don't grant space back on reads, no point
932                  * in running statfs, so just skip it and process
933                  * incoming grant data directly. */
934                 spin_lock(&ofd->ofd_grant_lock);
935                 do_shrink = 0;
936         }
937
938         /* extract incoming grant infomation provided by the client */
939         ofd_grant_incoming(env, exp, oa);
940
941         /* unlike writes, we don't return grants back on reads unless a grant
942          * shrink request was packed and we decided to turn it down. */
943         if (do_shrink)
944                 ofd_grant_shrink(exp, oa, left);
945         else
946                 oa->o_grant = 0;
947
948         spin_unlock(&ofd->ofd_grant_lock);
949 }
950
951 /**
952  * Process grant information from incoming bulk write request.
953  *
954  * This function extracts client's grant announcements from incoming bulk write
955  * request and attempts to allocate grant space for network buffers that need it
956  * (i.e. OBD_BRW_FROM_GRANT not set in rnb_fags).
957  * Network buffers which aren't granted the OBD_BRW_GRANTED flag should not
958  * proceed further and should fail with -ENOSPC.
959  * Whenever possible, additional grant space will be returned to the client
960  * in the bulk write reply.
961  * ofd_grant_prepare_write() must be called before writting any buffers to
962  * the backend storage. This function works in pair with ofd_grant_commit()
963  * which must be invoked once all buffers have been written to disk in order
964  * to release space from the pending grant counter.
965  *
966  * \param[in] env       LU environment provided by the caller
967  * \param[in] exp       export of the client which sent the request
968  * \param[in] oa        incoming obdo sent by the client
969  * \param[in] rnb       list of network buffers
970  * \param[in] niocount  number of network buffers in the list
971  */
972 void ofd_grant_prepare_write(const struct lu_env *env,
973                              struct obd_export *exp, struct obdo *oa,
974                              struct niobuf_remote *rnb, int niocount)
975 {
976         struct obd_device       *obd = exp->exp_obd;
977         struct ofd_device       *ofd = ofd_exp(exp);
978         u64                      left;
979         int                      from_cache;
980         int                      force = 0; /* can use cached data intially */
981         int                      rc;
982
983         ENTRY;
984
985 refresh:
986         /* get statfs information from OSD layer */
987         ofd_grant_statfs(env, exp, force, &from_cache);
988
989         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
990
991         /* Grab free space from cached statfs data and take out space already
992          * granted to clients as well as reserved space */
993         left = ofd_grant_space_left(exp);
994
995         /* Get fresh statfs data if we are short in ungranted space */
996         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
997                 spin_unlock(&ofd->ofd_grant_lock);
998                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
999                        obd->obd_name);
1000                 force = 1;
1001                 goto refresh;
1002         }
1003
1004         /* When close to free space exhaustion, trigger a sync to force
1005          * writeback cache to consume required space immediately and release as
1006          * much space as possible. */
1007         if (!obd->obd_recovering && force != 2 && left < OFD_GRANT_CHUNK) {
1008                 bool from_grant = true;
1009                 int  i;
1010
1011                 /* That said, it is worth running a sync only if some pages did
1012                  * not consume grant space on the client and could thus fail
1013                  * with ENOSPC later in ofd_grant_check() */
1014                 for (i = 0; i < niocount; i++)
1015                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
1016                                 from_grant = false;
1017
1018                 if (!from_grant) {
1019                         /* at least one network buffer requires acquiring grant
1020                          * space on the server */
1021                         spin_unlock(&ofd->ofd_grant_lock);
1022                         /* discard errors, at least we tried ... */
1023                         rc = dt_sync(env, ofd->ofd_osd);
1024                         force = 2;
1025                         goto refresh;
1026                 }
1027         }
1028
1029         /* extract incoming grant information provided by the client */
1030         ofd_grant_incoming(env, exp, oa);
1031
1032         /* check limit */
1033         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
1034
1035         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
1036                 spin_unlock(&ofd->ofd_grant_lock);
1037                 RETURN_EXIT;
1038         }
1039
1040         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
1041          * grant space. */
1042         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1043             (oa->o_flags & OBD_FL_SHRINK_GRANT))
1044                 ofd_grant_shrink(exp, oa, left);
1045         else
1046                 /* grant more space back to the client if possible */
1047                 oa->o_grant = ofd_grant_alloc(exp, oa->o_grant, oa->o_undirty,
1048                                               left, true);
1049         spin_unlock(&ofd->ofd_grant_lock);
1050 }
1051
1052 /**
1053  * Consume grant space reserved for object creation.
1054  *
1055  * Grant space is allocated to the local self export for object precreation.
1056  * This is required to prevent object precreation from consuming grant space
1057  * allocated to client nodes for the data writeback cache.
1058  * This function consumes enough space to create \a nr objects and allocates
1059  * more grant space to the self export for future precreation requests, if
1060  * possible.
1061  *
1062  * \param[in] env       LU environment provided by the caller
1063  * \param[in] exp       export holding the grant space for precreation (= self
1064  *                      export currently)
1065  * \param[in] nr        number of objects to be created
1066  *
1067  * \retval >= 0         amount of grant space allocated to the precreate request
1068  * \retval -ENOSPC      on failure
1069  */
1070 long ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
1071 {
1072         struct ofd_device               *ofd = ofd_exp(exp);
1073         struct filter_export_data       *fed = &exp->exp_filter_data;
1074         u64                              left = 0;
1075         unsigned long                    wanted;
1076         unsigned long                    granted;
1077         ENTRY;
1078
1079         if (exp->exp_obd->obd_recovering ||
1080             ofd->ofd_dt_conf.ddp_inodespace == 0)
1081                 /* don't enforce grant during recovery */
1082                 RETURN(0);
1083
1084         /* Update statfs data if required */
1085         ofd_grant_statfs(env, exp, 1, NULL);
1086
1087         /* protect all grant counters */
1088         spin_lock(&ofd->ofd_grant_lock);
1089
1090         /* fail precreate request if there is not enough blocks available for
1091          * writing */
1092         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
1093             (ofd->ofd_osfs.os_blocks >> 10)) {
1094                 spin_unlock(&ofd->ofd_grant_lock);
1095                 CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
1096                        ofd_name(ofd),
1097                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
1098                 RETURN(-ENOSPC);
1099         }
1100
1101         /* Grab free space from cached statfs data and take out space
1102          * already granted to clients as well as reserved space */
1103         left = ofd_grant_space_left(exp);
1104
1105         /* compute how much space is required to handle the precreation
1106          * request */
1107         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1108         if (wanted > fed->fed_grant + left) {
1109                 /* that's beyond what remains, adjust the number of objects that
1110                  * can be safely precreated */
1111                 wanted = fed->fed_grant + left;
1112                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
1113                 if (*nr == 0) {
1114                         /* we really have no space any more for precreation,
1115                          * fail the precreate request with ENOSPC */
1116                         spin_unlock(&ofd->ofd_grant_lock);
1117                         RETURN(-ENOSPC);
1118                 }
1119                 /* compute space needed for the new number of creations */
1120                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1121         }
1122         LASSERT(wanted <= fed->fed_grant + left);
1123
1124         if (wanted <= fed->fed_grant) {
1125                 /* we've enough grant space to handle this precreate request */
1126                 fed->fed_grant -= wanted;
1127         } else {
1128                 /* we need to take some space from the ungranted pool */
1129                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
1130                 left -= wanted - fed->fed_grant;
1131                 fed->fed_grant = 0;
1132         }
1133         granted = wanted;
1134         fed->fed_pending += granted;
1135         ofd->ofd_tot_pending += granted;
1136
1137         /* grant more space for precreate purpose if possible. */
1138         wanted = OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
1139         if (wanted > fed->fed_grant) {
1140                 /* always try to book enough space to handle a large precreate
1141                  * request */
1142                 wanted -= fed->fed_grant;
1143                 ofd_grant_alloc(exp, fed->fed_grant, wanted, left, false);
1144         }
1145         spin_unlock(&ofd->ofd_grant_lock);
1146         RETURN(granted);
1147 }
1148
1149 /**
1150  * Release grant space added to the pending counter by ofd_grant_prepare_write()
1151  *
1152  * Update pending grant counter once buffers have been written to the disk.
1153  *
1154  * \param[in] exp       export of the client which sent the request
1155  * \param[in] pending   amount of reserved space to be released
1156  * \param[in] rc        return code of pre-commit operations
1157  */
1158 void ofd_grant_commit(struct obd_export *exp, unsigned long pending,
1159                       int rc)
1160 {
1161         struct ofd_device       *ofd  = ofd_exp(exp);
1162         ENTRY;
1163
1164         /* get space accounted in tot_pending for the I/O, set in
1165          * ofd_grant_check() */
1166         if (pending == 0)
1167                 RETURN_EXIT;
1168
1169         spin_lock(&ofd->ofd_grant_lock);
1170         /* Don't update statfs data for errors raised before commit (e.g.
1171          * bulk transfer failed, ...) since we know those writes have not been
1172          * processed. For other errors hit during commit, we cannot really tell
1173          * whether or not something was written, so we update statfs data.
1174          * In any case, this should not be fatal since we always get fresh
1175          * statfs data before failing a request with ENOSPC */
1176         if (rc == 0) {
1177                 spin_lock(&ofd->ofd_osfs_lock);
1178                 /* Take pending out of cached statfs data */
1179                 ofd->ofd_osfs.os_bavail -= min_t(u64,
1180                                                  ofd->ofd_osfs.os_bavail,
1181                                                  pending >> ofd->ofd_blockbits);
1182                 if (ofd->ofd_statfs_inflight)
1183                         /* someone is running statfs and want to be notified of
1184                          * writes happening meanwhile */
1185                         ofd->ofd_osfs_inflight += pending;
1186                 spin_unlock(&ofd->ofd_osfs_lock);
1187         }
1188
1189         if (exp->exp_filter_data.fed_pending < pending) {
1190                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1191                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1192                        exp->exp_filter_data.fed_pending, pending);
1193                 spin_unlock(&ofd->ofd_grant_lock);
1194                 LBUG();
1195         }
1196         exp->exp_filter_data.fed_pending -= pending;
1197
1198         if (ofd->ofd_tot_granted < pending) {
1199                  CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
1200                         "\n", exp->exp_obd->obd_name,
1201                         exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
1202                         pending);
1203                 spin_unlock(&ofd->ofd_grant_lock);
1204                 LBUG();
1205         }
1206         ofd->ofd_tot_granted -= pending;
1207
1208         if (ofd->ofd_tot_pending < pending) {
1209                  CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
1210                         "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1211                         exp, ofd->ofd_tot_pending, pending);
1212                 spin_unlock(&ofd->ofd_grant_lock);
1213                 LBUG();
1214         }
1215         ofd->ofd_tot_pending -= pending;
1216         spin_unlock(&ofd->ofd_grant_lock);
1217         EXIT;
1218 }
1219
1220 struct ofd_grant_cb {
1221         /* commit callback structure */
1222         struct dt_txn_commit_cb  ogc_cb;
1223         /* export associated with the bulk write */
1224         struct obd_export       *ogc_exp;
1225         /* pending grant to be released */
1226         unsigned long            ogc_granted;
1227 };
1228
1229 /**
1230  * Callback function for grant releasing
1231  *
1232  * Release grant space reserved by the client node.
1233  *
1234  * \param[in] env       execution environment
1235  * \param[in] th        transaction handle
1236  * \param[in] cb        callback data
1237  * \param[in] err       error code
1238  */
1239 static void ofd_grant_commit_cb(struct lu_env *env, struct thandle *th,
1240                                 struct dt_txn_commit_cb *cb, int err)
1241 {
1242         struct ofd_grant_cb     *ogc;
1243
1244         ogc = container_of(cb, struct ofd_grant_cb, ogc_cb);
1245
1246         ofd_grant_commit(ogc->ogc_exp, ogc->ogc_granted, err);
1247         class_export_cb_put(ogc->ogc_exp);
1248         OBD_FREE_PTR(ogc);
1249 }
1250
1251 /**
1252  * Add callback for grant releasing
1253  *
1254  * Register a commit callback to release grant space.
1255  *
1256  * \param[in] th        transaction handle
1257  * \param[in] exp       OBD export of client
1258  * \param[in] granted   amount of grant space to be released upon commit
1259  *
1260  * \retval              0 on successful callback adding
1261  * \retval              negative value on error
1262  */
1263 int ofd_grant_commit_cb_add(struct thandle *th, struct obd_export *exp,
1264                             unsigned long granted)
1265 {
1266         struct ofd_grant_cb     *ogc;
1267         struct dt_txn_commit_cb *dcb;
1268         int                      rc;
1269         ENTRY;
1270
1271         OBD_ALLOC_PTR(ogc);
1272         if (ogc == NULL)
1273                 RETURN(-ENOMEM);
1274
1275         ogc->ogc_exp = class_export_cb_get(exp);
1276         ogc->ogc_granted = granted;
1277
1278         dcb = &ogc->ogc_cb;
1279         dcb->dcb_func = ofd_grant_commit_cb;
1280         INIT_LIST_HEAD(&dcb->dcb_linkage);
1281         strlcpy(dcb->dcb_name, "ofd_grant_commit_cb", sizeof(dcb->dcb_name));
1282
1283         rc = dt_trans_cb_add(th, dcb);
1284         if (rc) {
1285                 class_export_cb_put(ogc->ogc_exp);
1286                 OBD_FREE_PTR(ogc);
1287         }
1288
1289         RETURN(rc);
1290 }