Whamcloud - gitweb
ed92f14cdb96f3d8f6198fbbe41e5e0ca860dbf8
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * lustre/ofd/ofd_grant.c
30  *
31  * This file provides code related to grant space management on Object Storage
32  * Targets (OSTs). Grant is a mechanism used by client nodes to reserve disk
33  * space on OSTs for the data writeback cache. The Lustre client is thus assured
34  * that enough space will be available when flushing dirty pages asynchronously.
35  * Each client node is granted an initial amount of reserved space at connect
36  * time and gets additional space back from OST in bulk write reply.
37  *
38  * We actually support three different cases:
39  * - The client supports the new grant parameters (i.e. OBD_CONNECT_GRANT_PARAM)
40  *   which means that all grant overhead calculation happens on the client side.
41  *   The server reports at connect time the backend filesystem block size, the
42  *   maximum extent size as well as the extent insertion cost and it is then up
43  *   to the osc layer to the track dirty extents and consume grant accordingly
44  *   (see osc_cache.c). In each bulk write request, the client provides how much
45  *   grant space was consumed for this RPC.
46  * - The client does not support OBD_CONNECT_GRANT_PARAM and always assumes a
47  *   a backend file system block size of 4KB. We then have two cases:
48  *   - If the block size is really 4KB, then the client can deal with grant
49  *     allocation for partial block writes, but won't take extent insertion cost
50  *     into account. For such clients, we inflate grant by 100% on the server
51  *     side. It means that when 32MB of grant is hold by the client, 64MB of
52  *     grant space is actually reserved on the server. All grant counters
53  *     provided by such a client are inflated by 100%.
54  *   - The backend filesystem block size is bigger than 4KB, which isn't
55  *     supported by the client. In this case, we emulate a 4KB block size and
56  *     consume one block size on the server for each 4KB of grant returned to
57  *     client. With a 128KB blocksize, it means that 32MB dirty pages of 4KB
58  *     on the client will actually consume 1GB of grant on the server.
59  *     All grant counters provided by such a client are inflated by the block
60  *     size ratio.
61  *
62  * This file handles the core logic for:
63  * - grant allocation strategy
64  * - maintaining per-client as well as global grant space accounting
65  * - processing grant information packed in incoming requests
66  * - allocating server-side grant space for synchronous write RPCs which did not
67  *   consume grant on the client side (OBD_BRW_FROM_GRANT flag not set). If not
68  *   enough space is available, such RPCs fail with ENOSPC
69  *
70  * Author: Johann Lombardi <johann.lombardi@intel.com>
71  */
72
73 #define DEBUG_SUBSYSTEM S_FILTER
74
75 #include "ofd_internal.h"
76
77 /* Clients typically hold 2x their max_rpcs_in_flight of grant space */
78 #define OFD_GRANT_SHRINK_LIMIT(exp)     (2ULL * 8 * exp_max_brw_size(exp))
79
80 /* Helpers to inflate/deflate grants for clients that do not support the grant
81  * parameters */
82 static inline u64 ofd_grant_inflate(struct ofd_device *ofd, u64 val)
83 {
84         if (ofd->ofd_blockbits > COMPAT_BSIZE_SHIFT)
85                 /* Client does not support such large block size, grant
86                  * is thus inflated. We already significantly overestimate
87                  * overhead, no need to add the extent tax in this case */
88                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
89         /* client can deal with the block size, but does not support per-extent
90          * grant accounting, inflate grant by 100% for such clients */
91         return val << 1;
92 }
93
94 /* Companion of ofd_grant_inflate() */
95 static inline u64 ofd_grant_deflate(struct ofd_device *ofd, u64 val)
96 {
97         if (ofd->ofd_blockbits > COMPAT_BSIZE_SHIFT)
98                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
99         return val >> 1;
100 }
101
102 /* Grant chunk is used as a unit for grant allocation. It should be inflated
103  * if the client does not support the grant paramaters.
104  * Check connection flag against \a data if not NULL. This is used during
105  * connection creation where exp->exp_connect_data isn't populated yet */
106 static inline u64 ofd_grant_chunk(struct obd_export *exp,
107                                   struct ofd_device *ofd,
108                                   struct obd_connect_data *data)
109 {
110         u64 chunk = exp_max_brw_size(exp);
111         u64 tax;
112
113         if (ofd_obd(ofd)->obd_self_export == exp)
114                 /* Grant enough space to handle a big precreate request */
115                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
116
117         if ((data == NULL && !ofd_grant_param_supp(exp)) ||
118             (data != NULL && !OCD_HAS_FLAG(data, GRANT_PARAM)))
119                 /* Try to grant enough space to send a full-size RPC */
120                 return ofd_grant_inflate(ofd, chunk);
121
122         /* Try to return enough to send two full-size RPCs
123          * = 2 * (BRW_size + #extents_in_BRW * grant_tax) */
124         tax = 1ULL << ofd->ofd_blockbits;            /* block size */
125         tax *= ofd->ofd_dt_conf.ddp_max_extent_blks; /* max extent size */
126         tax = (chunk + tax - 1) / tax;               /* #extents in a RPC */
127         tax *= ofd->ofd_dt_conf.ddp_extent_tax;      /* extent tax for a RPC */
128         chunk = (chunk + tax) * 2;                   /* we said two full RPCs */
129         return chunk;
130 }
131
132 /**
133  * Perform extra sanity checks for grant accounting.
134  *
135  * This function scans the export list, sanity checks per-export grant counters
136  * and verifies accuracy of global grant accounting. If an inconsistency is
137  * found, a CERROR is printed with the function name \func that was passed as
138  * argument. LBUG is only called in case of serious counter corruption (i.e.
139  * value larger than the device size).
140  * Those sanity checks can be pretty expensive and are disabled if the OBD
141  * device has more than 100 connected exports.
142  *
143  * \param[in] obd       OBD device for which grant accounting should be
144  *                      verified
145  * \param[in] func      caller's function name
146  */
147 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
148 {
149         struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
150         struct obd_export *exp;
151         u64                maxsize;
152         u64                tot_dirty = 0;
153         u64                tot_pending = 0;
154         u64                tot_granted = 0;
155         u64                fo_tot_granted;
156         u64                fo_tot_pending;
157         u64                fo_tot_dirty;
158
159         if (list_empty(&obd->obd_exports))
160                 return;
161
162         /* We don't want to do this for large machines that do lots of
163          * mounts or unmounts.  It burns... */
164         if (obd->obd_num_exports > 100)
165                 return;
166
167         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
168
169         spin_lock(&obd->obd_dev_lock);
170         spin_lock(&ofd->ofd_grant_lock);
171         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
172                 struct filter_export_data       *fed;
173                 int                              error = 0;
174
175                 fed = &exp->exp_filter_data;
176
177                 if (obd->obd_self_export == exp)
178                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
179                                "%ld\n", obd->obd_name, fed->fed_grant,
180                                fed->fed_pending, fed->fed_dirty);
181
182                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
183                     fed->fed_dirty < 0)
184                         error = 1;
185                 if (fed->fed_grant + fed->fed_pending > maxsize) {
186                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
187                                " > maxsize(%llu)\n", obd->obd_name,
188                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
189                                fed->fed_pending, maxsize);
190                         spin_unlock(&obd->obd_dev_lock);
191                         spin_unlock(&ofd->ofd_grant_lock);
192                         LBUG();
193                 }
194                 if (fed->fed_dirty > maxsize) {
195                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize(%llu"
196                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
197                                exp, fed->fed_dirty, maxsize);
198                         spin_unlock(&obd->obd_dev_lock);
199                         spin_unlock(&ofd->ofd_grant_lock);
200                         LBUG();
201                 }
202                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
203                              "%ld pend %ld grant %ld\n", obd->obd_name,
204                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
205                              fed->fed_pending, fed->fed_grant);
206                 tot_granted += fed->fed_grant + fed->fed_pending;
207                 tot_pending += fed->fed_pending;
208                 tot_dirty += fed->fed_dirty;
209         }
210
211         /* exports about to be unlinked should also be taken into account since
212          * they might still hold pending grant space to be released at
213          * commit time */
214         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain) {
215                 struct filter_export_data       *fed;
216                 int                              error = 0;
217
218                 fed = &exp->exp_filter_data;
219
220                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
221                     fed->fed_dirty < 0)
222                         error = 1;
223                 if (fed->fed_grant + fed->fed_pending > maxsize) {
224                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
225                                " > maxsize(%llu)\n", obd->obd_name,
226                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
227                                fed->fed_pending, maxsize);
228                         spin_unlock(&obd->obd_dev_lock);
229                         spin_unlock(&ofd->ofd_grant_lock);
230                         LBUG();
231                 }
232                 if (fed->fed_dirty > maxsize) {
233                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize(%llu"
234                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
235                                exp, fed->fed_dirty, maxsize);
236                         spin_unlock(&obd->obd_dev_lock);
237                         spin_unlock(&ofd->ofd_grant_lock);
238                         LBUG();
239                 }
240                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
241                              "%ld pend %ld grant %ld\n", obd->obd_name,
242                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
243                              fed->fed_pending, fed->fed_grant);
244                 tot_granted += fed->fed_grant + fed->fed_pending;
245                 tot_pending += fed->fed_pending;
246                 tot_dirty += fed->fed_dirty;
247         }
248
249         fo_tot_granted = ofd->ofd_tot_granted;
250         fo_tot_pending = ofd->ofd_tot_pending;
251         fo_tot_dirty = ofd->ofd_tot_dirty;
252         spin_unlock(&obd->obd_dev_lock);
253         spin_unlock(&ofd->ofd_grant_lock);
254
255         if (tot_granted != fo_tot_granted)
256                 CERROR("%s: tot_granted %llu != fo_tot_granted %llu\n",
257                        func, tot_granted, fo_tot_granted);
258         if (tot_pending != fo_tot_pending)
259                 CERROR("%s: tot_pending %llu != fo_tot_pending %llu\n",
260                        func, tot_pending, fo_tot_pending);
261         if (tot_dirty != fo_tot_dirty)
262                 CERROR("%s: tot_dirty %llu != fo_tot_dirty %llu\n",
263                        func, tot_dirty, fo_tot_dirty);
264         if (tot_pending > tot_granted)
265                 CERROR("%s: tot_pending %llu > tot_granted %llu\n",
266                        func, tot_pending, tot_granted);
267         if (tot_granted > maxsize)
268                 CERROR("%s: tot_granted %llu > maxsize %llu\n",
269                        func, tot_granted, maxsize);
270         if (tot_dirty > maxsize)
271                 CERROR("%s: tot_dirty %llu > maxsize %llu\n",
272                        func, tot_dirty, maxsize);
273 }
274
275 /**
276  * Update cached statfs information from the OSD layer
277  *
278  * Refresh statfs information cached in ofd::ofd_osfs if the cache is older
279  * than 1s or if force is set. The OSD layer is in charge of estimating data &
280  * metadata overhead.
281  * This function can sleep so it should not be called with any spinlock held.
282  *
283  * \param[in] env               LU environment passed by the caller
284  * \param[in] exp               export used to print client info in debug
285  *                              messages
286  * \param[in] force             force a refresh of statfs information
287  * \param[out] from_cache       returns whether the statfs information are
288  *                              taken from cache
289  */
290 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
291                              int force, int *from_cache)
292 {
293         struct obd_device       *obd = exp->exp_obd;
294         struct ofd_device       *ofd = ofd_exp(exp);
295         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
296         __u64                    max_age;
297         int                      rc;
298
299         if (force)
300                 max_age = 0; /* get fresh statfs data */
301         else
302                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
303
304         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
305         if (unlikely(rc)) {
306                 if (from_cache)
307                         *from_cache = 0;
308                 return;
309         }
310
311         CDEBUG(D_CACHE, "%s: cli %s/%p free: %llu avail: %llu\n",
312                obd->obd_name, exp->exp_client_uuid.uuid, exp,
313                osfs->os_bfree << ofd->ofd_blockbits,
314                osfs->os_bavail << ofd->ofd_blockbits);
315 }
316
317 /**
318  * Figure out how much space is available on the backend filesystem after
319  * removing grant space already booked by clients.
320  *
321  * This is done by accessing cached statfs data previously populated by
322  * ofd_grant_statfs(), from which we withdraw the space already granted to
323  * clients and the reserved space.
324  * Caller must hold ofd_grant_lock spinlock.
325  *
326  * \param[in] exp       export associated with the device for which the amount
327  *                      of available space is requested
328  * \retval              amount of non-allocated space, in bytes
329  */
330 static u64 ofd_grant_space_left(struct obd_export *exp)
331 {
332         struct obd_device *obd = exp->exp_obd;
333         struct ofd_device *ofd = ofd_exp(exp);
334         u64                tot_granted;
335         u64                left;
336         u64                avail;
337         u64                unstable;
338
339         ENTRY;
340         assert_spin_locked(&ofd->ofd_grant_lock);
341
342         spin_lock(&ofd->ofd_osfs_lock);
343         /* get available space from cached statfs data */
344         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
345         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
346         spin_unlock(&ofd->ofd_osfs_lock);
347
348         tot_granted = ofd->ofd_tot_granted;
349
350         if (left < tot_granted) {
351                 int mask = (left + unstable <
352                             tot_granted - ofd->ofd_tot_pending) ?
353                             D_ERROR : D_CACHE;
354
355                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left %llu < tot_grant "
356                              "%llu unstable %llu pending %llu "
357                              "dirty %llu\n",
358                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
359                              left, tot_granted, unstable,
360                              ofd->ofd_tot_pending, ofd->ofd_tot_dirty);
361                 RETURN(0);
362         }
363
364         avail = left;
365         /* Withdraw space already granted to clients */
366         left -= tot_granted;
367
368         /* Align left on block size */
369         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
370
371         CDEBUG(D_CACHE, "%s: cli %s/%p avail %llu left %llu unstable "
372                "%llu tot_grant %llu pending %llu\n", obd->obd_name,
373                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
374                tot_granted, ofd->ofd_tot_pending);
375
376         RETURN(left);
377 }
378
379 /**
380  * Process grant information from obdo structure packed in incoming BRW
381  * and inflate grant counters if required.
382  *
383  * Grab the dirty and seen grant announcements from the incoming obdo and
384  * inflate all grant counters passed in the request if the client does not
385  * support the grant parameters.
386  * We will later calculate the client's new grant and return it.
387  * Caller must hold ofd_grant_lock spinlock.
388  *
389  * \param[in] env       LU environment supplying osfs storage
390  * \param[in] exp       export for which we received the request
391  * \param[in,out] oa    incoming obdo sent by the client
392  */
393 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
394                                struct obdo *oa, long chunk)
395 {
396         struct filter_export_data       *fed;
397         struct ofd_device               *ofd = ofd_exp(exp);
398         struct obd_device               *obd = exp->exp_obd;
399         long                             dirty;
400         long                             dropped;
401         ENTRY;
402
403         assert_spin_locked(&ofd->ofd_grant_lock);
404
405         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
406                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
407                 oa->o_valid &= ~OBD_MD_FLGRANT;
408                 RETURN_EXIT;
409         }
410
411         fed = &exp->exp_filter_data;
412
413         /* Add some margin, since there is a small race if other RPCs arrive
414          * out-or-order and have already consumed some grant.  We want to
415          * leave this here in case there is a large error in accounting. */
416         CDEBUG(D_CACHE,
417                "%s: cli %s/%p reports grant %llu dropped %u, local %lu\n",
418                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
419                oa->o_dropped, fed->fed_grant);
420
421         if ((long long)oa->o_dirty < 0)
422                 oa->o_dirty = 0;
423
424         /* inflate grant counters if required */
425         if (!ofd_grant_param_supp(exp)) {
426                 oa->o_grant     = ofd_grant_inflate(ofd, oa->o_grant);
427                 oa->o_dirty     = ofd_grant_inflate(ofd, oa->o_dirty);
428                 oa->o_dropped   = ofd_grant_inflate(ofd, (u64)oa->o_dropped);
429                 oa->o_undirty   = ofd_grant_inflate(ofd, oa->o_undirty);
430         }
431
432         dirty = oa->o_dirty;
433         dropped = oa->o_dropped;
434
435         /* Update our accounting now so that statfs takes it into account.
436          * Note that fed_dirty is only approximate and can become incorrect
437          * if RPCs arrive out-of-order.  No important calculations depend
438          * on fed_dirty however, but we must check sanity to not assert. */
439         if (dirty > fed->fed_grant + 4 * chunk)
440                 dirty = fed->fed_grant + 4 * chunk;
441         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
442         if (fed->fed_grant < dropped) {
443                 CDEBUG(D_CACHE,
444                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
445                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
446                        fed->fed_grant);
447                 dropped = 0;
448         }
449         if (ofd->ofd_tot_granted < dropped) {
450                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant %llu\n",
451                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
452                        dropped, ofd->ofd_tot_granted);
453                 dropped = 0;
454         }
455         ofd->ofd_tot_granted -= dropped;
456         fed->fed_grant -= dropped;
457         fed->fed_dirty = dirty;
458
459         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
460                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
461                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
462                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
463                 spin_unlock(&ofd->ofd_grant_lock);
464                 LBUG();
465         }
466         EXIT;
467 }
468
469 /**
470  * Grant shrink request handler.
471  *
472  * Client nodes can explicitly release grant space (i.e. process called grant
473  * shrinking). This function proceeds with the shrink request when there is
474  * less ungranted space remaining than the amount all of the connected clients
475  * would consume if they used their full grant.
476  * Caller must hold ofd_grant_lock spinlock.
477  *
478  * \param[in] exp               export releasing grant space
479  * \param[in,out] oa            incoming obdo sent by the client
480  * \param[in] left_space        remaining free space with space already granted
481  *                              taken out
482  */
483 static void ofd_grant_shrink(struct obd_export *exp, struct obdo *oa,
484                              u64 left_space)
485 {
486         struct filter_export_data       *fed;
487         struct ofd_device               *ofd = ofd_exp(exp);
488         struct obd_device               *obd = exp->exp_obd;
489         long                             grant_shrink;
490
491         assert_spin_locked(&ofd->ofd_grant_lock);
492         LASSERT(exp);
493         if (left_space >= ofd->ofd_tot_granted_clients *
494                           OFD_GRANT_SHRINK_LIMIT(exp))
495                 return;
496
497         grant_shrink = oa->o_grant;
498
499         fed = &exp->exp_filter_data;
500         fed->fed_grant       -= grant_shrink;
501         ofd->ofd_tot_granted -= grant_shrink;
502
503         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total %llu\n",
504                obd->obd_name, exp->exp_client_uuid.uuid, exp, grant_shrink,
505                fed->fed_grant, ofd->ofd_tot_granted);
506
507         /* client has just released some grant, don't grant any space back */
508         oa->o_grant = 0;
509 }
510
511 /**
512  * Calculate how much space is required to write a given network buffer
513  *
514  * This function takes block alignment into account to estimate how much on-disk
515  * space will be required to successfully write the whole niobuf.
516  * Estimated space is inflated if the export does not support
517  * OBD_CONNECT_GRANT_PARAM and if the backend filesystem has a block size
518  * larger than the minimal supported page size (i.e. 4KB).
519  *
520  * \param[in] exp       export associated which the write request
521  *                      if NULL, then size estimate is done for server-side
522  *                      grant allocation.
523  * \param[in] ofd       ofd device handling the request
524  * \param[in] rnb       network buffer to estimate size of
525  *
526  * \retval              space (in bytes) that will be consumed to write the
527  *                      network buffer
528  */
529 static inline u64 ofd_grant_rnb_size(struct obd_export *exp,
530                                      struct ofd_device *ofd,
531                                      struct niobuf_remote *rnb)
532 {
533         u64 blksize;
534         u64 bytes;
535         u64 end;
536
537         if (exp && !ofd_grant_param_supp(exp) &&
538             ofd->ofd_blockbits > COMPAT_BSIZE_SHIFT)
539                 blksize = 1ULL << COMPAT_BSIZE_SHIFT;
540         else
541                 blksize = 1ULL << ofd->ofd_blockbits;
542
543         /* The network buffer might span several blocks, align it on block
544          * boundaries */
545         bytes  = rnb->rnb_offset & (blksize - 1);
546         bytes += rnb->rnb_len;
547         end    = bytes & (blksize - 1);
548         if (end)
549                 bytes += blksize - end;
550
551         if (exp == NULL || ofd_grant_param_supp(exp)) {
552                 /* add per-extent insertion cost */
553                 u64 max_ext;
554                 int nr_ext;
555
556                 max_ext = blksize * ofd->ofd_dt_conf.ddp_max_extent_blks;
557                 nr_ext = (bytes + max_ext - 1) / max_ext;
558                 bytes += nr_ext * ofd->ofd_dt_conf.ddp_extent_tax;
559         } else {
560                 /* Inflate grant space if client does not support extent-based
561                  * grant allocation */
562                 bytes = ofd_grant_inflate(ofd, (u64)bytes);
563         }
564
565         return bytes;
566 }
567
568 /**
569  * Validate grant accounting for each incoming remote network buffer.
570  *
571  * When clients have dirtied as much space as they've been granted they
572  * fall through to sync writes. These sync writes haven't been expressed
573  * in grants and need to error with ENOSPC when there isn't room in the
574  * filesystem for them after grants are taken into account. However,
575  * writeback of the dirty data that was already granted space can write
576  * right on through.
577  * The OBD_BRW_GRANTED flag will be set in the rnb_flags of each network
578  * buffer which has been granted enough space to proceed. Buffers without
579  * this flag will fail to be written with -ENOSPC (see ofd_preprw_write().
580  * Caller must hold ofd_grant_lock spinlock.
581  *
582  * \param[in] env       LU environment passed by the caller
583  * \param[in] exp       export identifying the client which sent the RPC
584  * \param[in] oa        incoming obdo in which we should return the pack the
585  *                      additional grant
586  * \param[in,out] rnb   the list of network buffers
587  * \param[in] niocount  the number of network buffers in the list
588  * \param[in] left      the remaining free space with space already granted
589  *                      taken out
590  */
591 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
592                             struct obdo *oa, struct niobuf_remote *rnb,
593                             int niocount, u64 *left)
594 {
595         struct filter_export_data       *fed = &exp->exp_filter_data;
596         struct obd_device               *obd = exp->exp_obd;
597         struct ofd_device               *ofd = ofd_exp(exp);
598         unsigned long                    ungranted = 0;
599         unsigned long                    granted = 0;
600         int                              i;
601         bool                             skip = false;
602         struct ofd_thread_info          *info = ofd_info(env);
603
604         ENTRY;
605
606         assert_spin_locked(&ofd->ofd_grant_lock);
607
608         if (obd->obd_recovering) {
609                 /* Replaying write. Grant info have been processed already so no
610                  * need to do any enforcement here. It is worth noting that only
611                  * bulk writes with all rnbs having OBD_BRW_FROM_GRANT can be
612                  * replayed. If one page hasn't OBD_BRW_FROM_GRANT set, then
613                  * the whole bulk is written synchronously */
614                 skip = true;
615                 CDEBUG(D_CACHE, "Replaying write, skipping accounting\n");
616         } else if ((oa->o_valid & OBD_MD_FLFLAGS) &&
617                    (oa->o_flags & OBD_FL_RECOV_RESEND)) {
618                 /* Recoverable resend, grant info have already been processed as
619                  * well */
620                 skip = true;
621                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
622                                 "accounting\n");
623         } else if (ofd_grant_param_supp(exp) && oa->o_grant_used > 0) {
624                 /* Client supports the new grant parameters and is telling us
625                  * how much grant space it consumed for this bulk write.
626                  * Although all rnbs are supposed to have the OBD_BRW_FROM_GRANT
627                  * flag set, we will scan the rnb list and looks for non-cache
628                  * I/O in case it changes in the future */
629                 if (fed->fed_grant >= oa->o_grant_used) {
630                         /* skip grant accounting for rnbs with
631                          * OBD_BRW_FROM_GRANT and just used grant consumption
632                          * claimed in the request */
633                         granted = oa->o_grant_used;
634                         skip = true;
635                 } else {
636                         /* client has used more grants for this request that
637                          * it owns ... */
638                         CERROR("%s: cli %s claims %lu GRANT, real grant %lu\n",
639                                exp->exp_obd->obd_name,
640                                exp->exp_client_uuid.uuid,
641                                (unsigned long)oa->o_grant_used, fed->fed_grant);
642
643                         /* check whether we can fill the gap with unallocated
644                          * grant */
645                         if (*left > (oa->o_grant_used - fed->fed_grant)) {
646                                 /* ouf .. we are safe for now */
647                                 granted = fed->fed_grant;
648                                 ungranted = oa->o_grant_used - granted;
649                                 *left -= ungranted;
650                                 skip = true;
651                         }
652                         /* too bad, but we cannot afford to blow up our grant
653                          * accounting. The loop below will handle each rnb in
654                          * case by case. */
655                 }
656         }
657
658         for (i = 0; i < niocount; i++) {
659                 int bytes;
660
661                 if ((rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
662                         if (skip) {
663                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
664                                 continue;
665                         }
666
667                         /* compute how much grant space is actually needed for
668                          * this rnb, inflate grant if required */
669                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
670                         if (fed->fed_grant >= granted + bytes) {
671                                 granted += bytes;
672                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
673                                 continue;
674                         }
675
676                         CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d GRANT, "
677                                "real grant %lu idx %d\n", obd->obd_name,
678                                exp->exp_client_uuid.uuid, exp, granted, bytes,
679                                fed->fed_grant, i);
680                 }
681
682                 if (obd->obd_recovering)
683                         CERROR("%s: cli %s is replaying OST_WRITE while one rnb"
684                                " hasn't OBD_BRW_FROM_GRANT set (0x%x)\n",
685                                obd->obd_name, exp->exp_client_uuid.uuid,
686                                rnb[i].rnb_flags);
687
688                 /* Consume grant space on the server.
689                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
690                  * so that the required grant space isn't inflated. This is
691                  * done on purpose since the server can deal with large block
692                  * size, unlike some clients */
693                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
694                 if (*left > bytes) {
695                         /* if enough space, pretend it was granted */
696                         ungranted += bytes;
697                         *left -= bytes;
698                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
699                         continue;
700                 }
701
702                 /* We can't check for already-mapped blocks here (make sense
703                  * when backend filesystem does not use COW) as it requires
704                  * dropping the grant lock.
705                  * Instead, we clear OBD_BRW_GRANTED and in that case we need
706                  * to go through and verify if all of the blocks not marked
707                  *  BRW_GRANTED are already mapped and we can ignore this error.
708                  */
709                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
710                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
711                        obd->obd_name, exp->exp_client_uuid.uuid, exp, i, bytes);
712         }
713
714         /* record in o_grant_used the actual space reserved for the I/O, will be
715          * used later in ofd_grant_commmit() */
716         oa->o_grant_used = granted + ungranted;
717         info->fti_used = granted + ungranted;
718
719         /* record space used for the I/O, will be used in ofd_grant_commmit() */
720         /* Now substract what the clients has used already.  We don't subtract
721          * this from the tot_granted yet, so that other client's can't grab
722          * that space before we have actually allocated our blocks. That
723          * happens in ofd_grant_commit() after the writes are done. */
724         fed->fed_grant -= granted;
725         fed->fed_pending += oa->o_grant_used;
726         ofd->ofd_tot_granted += ungranted;
727         ofd->ofd_tot_pending += oa->o_grant_used;
728
729         CDEBUG(D_CACHE,
730                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
731                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
732                granted, ungranted, fed->fed_grant, fed->fed_dirty);
733
734         if (obd->obd_recovering || (oa->o_valid & OBD_MD_FLGRANT) == 0)
735                 /* don't update dirty accounting during recovery or
736                  * if grant information got discarded (e.g. during resend) */
737                 RETURN_EXIT;
738
739         if (fed->fed_dirty < granted) {
740                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
741                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
742                        granted, fed->fed_dirty);
743                 granted = fed->fed_dirty;
744         }
745         ofd->ofd_tot_dirty -= granted;
746         fed->fed_dirty -= granted;
747
748         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
749                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
750                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
751                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
752                 spin_unlock(&ofd->ofd_grant_lock);
753                 LBUG();
754         }
755         EXIT;
756 }
757
758 /**
759  * Allocate additional grant space to a client
760  *
761  * Calculate how much grant space to return to client, based on how much space
762  * is currently free and how much of that is already granted.
763  * Caller must hold ofd_grant_lock spinlock.
764  *
765  * \param[in] exp               export of the client which sent the request
766  * \param[in] curgrant          current grant claimed by the client
767  * \param[in] want              how much grant space the client would like to
768  *                              have
769  * \param[in] left              remaining free space with granted space taken
770  *                              out
771  * \param[in] conservative      if set to true, the server should be cautious
772  *                              and limit how much space is granted back to the
773  *                              client. Otherwise, the server should try hard to
774  *                              satisfy the client request.
775  *
776  * \retval                      amount of grant space allocated
777  */
778 static long ofd_grant_alloc(struct obd_export *exp, u64 curgrant,
779                             u64 want, u64 left, long chunk,
780                             bool conservative)
781 {
782         struct obd_device               *obd = exp->exp_obd;
783         struct ofd_device               *ofd = ofd_exp(exp);
784         struct filter_export_data       *fed = &exp->exp_filter_data;
785         u64                              grant;
786
787         ENTRY;
788
789         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
790                 RETURN(0);
791
792         if (want > 0x7fffffff) {
793                 CERROR("%s: client %s/%p requesting > 2GB grant %llu\n",
794                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
795                 RETURN(0);
796         }
797
798         /* Grant some fraction of the client's requested grant space so that
799          * they are not always waiting for write credits (not all of it to
800          * avoid overgranting in face of multiple RPCs in flight).  This
801          * essentially will be able to control the OSC_MAX_RIF for a client.
802          *
803          * If we do have a large disparity between what the client thinks it
804          * has and what we think it has, don't grant very much and let the
805          * client consume its grant first.  Either it just has lots of RPCs
806          * in flight, or it was evicted and its grants will soon be used up. */
807         if (curgrant >= want || curgrant >= fed->fed_grant + chunk)
808                 RETURN(0);
809
810         if (obd->obd_recovering)
811                 conservative = false;
812
813         if (conservative)
814                 /* don't grant more than 1/8th of the remaining free space in
815                  * one chunk */
816                 left >>= 3;
817         grant = min(want - curgrant, left);
818         /* round grant up to the next block size */
819         grant = (grant + (1 << ofd->ofd_blockbits) - 1) &
820                 ~((1ULL << ofd->ofd_blockbits) - 1);
821
822         if (!grant)
823                 RETURN(0);
824
825         /* Limit to grant_chunk if not reconnect/recovery */
826         if ((grant > chunk) && conservative)
827                 grant = chunk;
828
829         ofd->ofd_tot_granted += grant;
830         fed->fed_grant += grant;
831
832         if (fed->fed_grant < 0) {
833                 CERROR("%s: cli %s/%p grant %ld want %llu current %llu\n",
834                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
835                        fed->fed_grant, want, curgrant);
836                 spin_unlock(&ofd->ofd_grant_lock);
837                 LBUG();
838         }
839
840         CDEBUG(D_CACHE,
841                "%s: cli %s/%p wants: %llu current grant %llu"
842                " granting: %llu\n", obd->obd_name, exp->exp_client_uuid.uuid,
843                exp, want, curgrant, grant);
844         CDEBUG(D_CACHE,
845                "%s: cli %s/%p tot cached:%llu granted:%llu"
846                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
847                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
848                obd->obd_num_exports);
849
850         RETURN(grant);
851 }
852
853 /**
854  * Handle grant space allocation on client connection & reconnection.
855  *
856  * A new non-readonly connection gets an initial grant allocation equals to
857  * ofd_grant_chunk() (i.e. twice the max BRW size in most of the cases).
858  * On reconnection, grant counters between client & OST are resynchronized
859  * and additional space might be granted back if possible.
860  *
861  * \param[in] env       LU environment provided by the caller
862  * \param[in] exp       client's export which is (re)connecting
863  * \param[in,out] data  obd_connect_data structure sent by the client in the
864  *                      connect request
865  * \param[in] new_conn  must set to true if this is a new connection and false
866  *                      for a reconnection
867  */
868 void ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
869                        struct obd_connect_data *data, bool new_conn)
870 {
871         struct ofd_device               *ofd = ofd_exp(exp);
872         struct filter_export_data       *fed = &exp->exp_filter_data;
873         u64                              left = 0;
874         u64                              want;
875         long                             chunk;
876         int                              from_cache;
877         int                              force = 0; /* can use cached data */
878
879         /* don't grant space to client with read-only access */
880         if (OCD_HAS_FLAG(data, RDONLY) ||
881             (!OCD_HAS_FLAG(data, GRANT_PARAM) &&
882              ofd->ofd_grant_compat_disable)) {
883                 data->ocd_grant = 0;
884                 data->ocd_connect_flags &= ~(OBD_CONNECT_GRANT |
885                                              OBD_CONNECT_GRANT_PARAM);
886                 RETURN_EXIT;
887         }
888
889         if (OCD_HAS_FLAG(data, GRANT_PARAM))
890                 want = data->ocd_grant;
891         else
892                 want = ofd_grant_inflate(ofd, data->ocd_grant);
893         chunk = ofd_grant_chunk(exp, ofd, data);
894 refresh:
895         ofd_grant_statfs(env, exp, force, &from_cache);
896
897         spin_lock(&ofd->ofd_grant_lock);
898
899         /* Grab free space from cached info and take out space already granted
900          * to clients as well as reserved space */
901         left = ofd_grant_space_left(exp);
902
903         /* get fresh statfs data if we are short in ungranted space */
904         if (from_cache && left < 32 * chunk) {
905                 spin_unlock(&ofd->ofd_grant_lock);
906                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
907                 force = 1;
908                 goto refresh;
909         }
910
911         ofd_grant_alloc(exp, (u64)fed->fed_grant, want, left, chunk, new_conn);
912
913         /* return to client its current grant */
914         if (OCD_HAS_FLAG(data, GRANT_PARAM))
915                 data->ocd_grant = fed->fed_grant;
916         else
917                 /* deflate grant */
918                 data->ocd_grant = ofd_grant_deflate(ofd,
919                                                     (u64)fed->fed_grant);
920
921         /* reset dirty accounting */
922         ofd->ofd_tot_dirty -= fed->fed_dirty;
923         fed->fed_dirty = 0;
924
925         if (new_conn && OCD_HAS_FLAG(data, GRANT))
926                 ofd->ofd_tot_granted_clients++;
927
928         spin_unlock(&ofd->ofd_grant_lock);
929
930         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: %llu left: %llu\n",
931                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
932                exp, data->ocd_grant, want, left);
933
934         EXIT;
935 }
936
937 /**
938  * Release all grant space attached to a given export.
939  *
940  * Remove a client from the grant accounting totals.  We also remove
941  * the export from the obd device under the osfs and dev locks to ensure
942  * that the ofd_grant_sanity_check() calculations are always valid.
943  * The client should do something similar when it invalidates its import.
944  *
945  * \param[in] exp       client's export to remove from grant accounting
946  */
947 void ofd_grant_discard(struct obd_export *exp)
948 {
949         struct obd_device               *obd = exp->exp_obd;
950         struct ofd_device               *ofd = ofd_exp(exp);
951         struct filter_export_data       *fed = &exp->exp_filter_data;
952
953         spin_lock(&ofd->ofd_grant_lock);
954         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
955                  "%s: tot_granted %llu cli %s/%p fed_grant %ld\n",
956                  obd->obd_name, ofd->ofd_tot_granted,
957                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
958         ofd->ofd_tot_granted -= fed->fed_grant;
959         fed->fed_grant = 0;
960         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
961                  "%s: tot_pending %llu cli %s/%p fed_pending %ld\n",
962                  obd->obd_name, ofd->ofd_tot_pending,
963                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
964         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
965          * commmits */
966         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
967                  "%s: tot_dirty %llu cli %s/%p fed_dirty %ld\n",
968                  obd->obd_name, ofd->ofd_tot_dirty,
969                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
970         ofd->ofd_tot_dirty -= fed->fed_dirty;
971         fed->fed_dirty = 0;
972         spin_unlock(&ofd->ofd_grant_lock);
973 }
974
975 /**
976  * Process grant information from incoming bulk read request.
977  *
978  * Extract grant information packed in obdo structure (OBD_MD_FLGRANT set in
979  * o_valid). Bulk reads usually comes with grant announcements (number of dirty
980  * blocks, remaining amount of grant space, ...) and could also include a grant
981  * shrink request. Unlike bulk write, no additional grant space is returned on
982  * bulk read request.
983  *
984  * \param[in] env       is the lu environment provided by the caller
985  * \param[in] exp       is the export of the client which sent the request
986  * \param[in,out] oa    is the incoming obdo sent by the client
987  */
988 void ofd_grant_prepare_read(const struct lu_env *env,
989                             struct obd_export *exp, struct obdo *oa)
990 {
991         struct ofd_device       *ofd = ofd_exp(exp);
992         int                      do_shrink;
993         u64                      left = 0;
994         ENTRY;
995
996         if (!oa)
997                 RETURN_EXIT;
998
999         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
1000                 /* The read request does not contain any grant
1001                  * information */
1002                 RETURN_EXIT;
1003
1004         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1005             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
1006                 /* To process grant shrink request, we need to know how much
1007                  * available space remains on the backend filesystem.
1008                  * Shrink requests are not so common, we always get fresh
1009                  * statfs information. */
1010                 ofd_grant_statfs(env, exp, 1, NULL);
1011
1012                 /* protect all grant counters */
1013                 spin_lock(&ofd->ofd_grant_lock);
1014
1015                 /* Grab free space from cached statfs data and take out space
1016                  * already granted to clients as well as reserved space */
1017                 left = ofd_grant_space_left(exp);
1018
1019                 /* all set now to proceed with shrinking */
1020                 do_shrink = 1;
1021         } else {
1022                 /* no grant shrinking request packed in the obdo and
1023                  * since we don't grant space back on reads, no point
1024                  * in running statfs, so just skip it and process
1025                  * incoming grant data directly. */
1026                 spin_lock(&ofd->ofd_grant_lock);
1027                 do_shrink = 0;
1028         }
1029
1030         /* extract incoming grant information provided by the client and
1031          * inflate grant counters if required */
1032         ofd_grant_incoming(env, exp, oa, ofd_grant_chunk(exp, ofd, NULL));
1033
1034         /* unlike writes, we don't return grants back on reads unless a grant
1035          * shrink request was packed and we decided to turn it down. */
1036         if (do_shrink)
1037                 ofd_grant_shrink(exp, oa, left);
1038         else
1039                 oa->o_grant = 0;
1040
1041         if (!ofd_grant_param_supp(exp))
1042                 oa->o_grant = ofd_grant_deflate(ofd, oa->o_grant);
1043         spin_unlock(&ofd->ofd_grant_lock);
1044         EXIT;
1045 }
1046
1047 /**
1048  * Process grant information from incoming bulk write request.
1049  *
1050  * This function extracts client's grant announcements from incoming bulk write
1051  * request and attempts to allocate grant space for network buffers that need it
1052  * (i.e. OBD_BRW_FROM_GRANT not set in rnb_fags).
1053  * Network buffers which aren't granted the OBD_BRW_GRANTED flag should not
1054  * proceed further and should fail with -ENOSPC.
1055  * Whenever possible, additional grant space will be returned to the client
1056  * in the bulk write reply.
1057  * ofd_grant_prepare_write() must be called before writting any buffers to
1058  * the backend storage. This function works in pair with ofd_grant_commit()
1059  * which must be invoked once all buffers have been written to disk in order
1060  * to release space from the pending grant counter.
1061  *
1062  * \param[in] env       LU environment provided by the caller
1063  * \param[in] exp       export of the client which sent the request
1064  * \param[in] oa        incoming obdo sent by the client
1065  * \param[in] rnb       list of network buffers
1066  * \param[in] niocount  number of network buffers in the list
1067  */
1068 void ofd_grant_prepare_write(const struct lu_env *env,
1069                              struct obd_export *exp, struct obdo *oa,
1070                              struct niobuf_remote *rnb, int niocount)
1071 {
1072         struct obd_device       *obd = exp->exp_obd;
1073         struct ofd_device       *ofd = ofd_exp(exp);
1074         u64                      left;
1075         int                      from_cache;
1076         int                      force = 0; /* can use cached data intially */
1077         long                     chunk = ofd_grant_chunk(exp, ofd, NULL);
1078
1079         ENTRY;
1080
1081 refresh:
1082         /* get statfs information from OSD layer */
1083         ofd_grant_statfs(env, exp, force, &from_cache);
1084
1085         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
1086
1087         /* Grab free space from cached statfs data and take out space already
1088          * granted to clients as well as reserved space */
1089         left = ofd_grant_space_left(exp);
1090
1091         /* Get fresh statfs data if we are short in ungranted space */
1092         if (from_cache && left < 32 * chunk) {
1093                 spin_unlock(&ofd->ofd_grant_lock);
1094                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
1095                        obd->obd_name);
1096                 force = 1;
1097                 goto refresh;
1098         }
1099
1100         /* When close to free space exhaustion, trigger a sync to force
1101          * writeback cache to consume required space immediately and release as
1102          * much space as possible. */
1103         if (!obd->obd_recovering && force != 2 && left < chunk) {
1104                 bool from_grant = true;
1105                 int  i;
1106
1107                 /* That said, it is worth running a sync only if some pages did
1108                  * not consume grant space on the client and could thus fail
1109                  * with ENOSPC later in ofd_grant_check() */
1110                 for (i = 0; i < niocount; i++)
1111                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
1112                                 from_grant = false;
1113
1114                 if (!from_grant) {
1115                         /* at least one network buffer requires acquiring grant
1116                          * space on the server */
1117                         spin_unlock(&ofd->ofd_grant_lock);
1118                         /* discard errors, at least we tried ... */
1119                         dt_sync(env, ofd->ofd_osd);
1120                         force = 2;
1121                         goto refresh;
1122                 }
1123         }
1124
1125         /* extract incoming grant information provided by the client,
1126          * and inflate grant counters if required */
1127         ofd_grant_incoming(env, exp, oa, chunk);
1128
1129         /* check limit */
1130         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
1131
1132         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
1133                 spin_unlock(&ofd->ofd_grant_lock);
1134                 RETURN_EXIT;
1135         }
1136
1137         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
1138          * grant space. */
1139         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1140             (oa->o_flags & OBD_FL_SHRINK_GRANT))
1141                 ofd_grant_shrink(exp, oa, left);
1142         else
1143                 /* grant more space back to the client if possible */
1144                 oa->o_grant = ofd_grant_alloc(exp, oa->o_grant, oa->o_undirty,
1145                                               left, chunk, true);
1146
1147         if (!ofd_grant_param_supp(exp))
1148                 oa->o_grant = ofd_grant_deflate(ofd, oa->o_grant);
1149         spin_unlock(&ofd->ofd_grant_lock);
1150         EXIT;
1151 }
1152
1153 /**
1154  * Consume grant space reserved for object creation.
1155  *
1156  * Grant space is allocated to the local self export for object precreation.
1157  * This is required to prevent object precreation from consuming grant space
1158  * allocated to client nodes for the data writeback cache.
1159  * This function consumes enough space to create \a nr objects and allocates
1160  * more grant space to the self export for future precreation requests, if
1161  * possible.
1162  *
1163  * \param[in] env       LU environment provided by the caller
1164  * \param[in] exp       export holding the grant space for precreation (= self
1165  *                      export currently)
1166  * \param[in] nr        number of objects to be created
1167  *
1168  * \retval >= 0         amount of grant space allocated to the precreate request
1169  * \retval -ENOSPC      on failure
1170  */
1171 long ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
1172 {
1173         struct ofd_device               *ofd = ofd_exp(exp);
1174         struct filter_export_data       *fed = &exp->exp_filter_data;
1175         u64                              left = 0;
1176         unsigned long                    wanted;
1177         unsigned long                    granted;
1178         ENTRY;
1179
1180         if (exp->exp_obd->obd_recovering ||
1181             ofd->ofd_dt_conf.ddp_inodespace == 0)
1182                 /* don't enforce grant during recovery */
1183                 RETURN(0);
1184
1185         /* Update statfs data if required */
1186         ofd_grant_statfs(env, exp, 1, NULL);
1187
1188         /* protect all grant counters */
1189         spin_lock(&ofd->ofd_grant_lock);
1190
1191         /* fail precreate request if there is not enough blocks available for
1192          * writing */
1193         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
1194             (ofd->ofd_osfs.os_blocks >> 10)) {
1195                 spin_unlock(&ofd->ofd_grant_lock);
1196                 CDEBUG(D_RPCTRACE, "%s: not enough space for create %llu\n",
1197                        ofd_name(ofd),
1198                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
1199                 RETURN(-ENOSPC);
1200         }
1201
1202         /* Grab free space from cached statfs data and take out space
1203          * already granted to clients as well as reserved space */
1204         left = ofd_grant_space_left(exp);
1205
1206         /* compute how much space is required to handle the precreation
1207          * request */
1208         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1209         if (wanted > fed->fed_grant + left) {
1210                 /* that's beyond what remains, adjust the number of objects that
1211                  * can be safely precreated */
1212                 wanted = fed->fed_grant + left;
1213                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
1214                 if (*nr == 0) {
1215                         /* we really have no space any more for precreation,
1216                          * fail the precreate request with ENOSPC */
1217                         spin_unlock(&ofd->ofd_grant_lock);
1218                         RETURN(-ENOSPC);
1219                 }
1220                 /* compute space needed for the new number of creations */
1221                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
1222         }
1223         LASSERT(wanted <= fed->fed_grant + left);
1224
1225         if (wanted <= fed->fed_grant) {
1226                 /* we've enough grant space to handle this precreate request */
1227                 fed->fed_grant -= wanted;
1228         } else {
1229                 /* we need to take some space from the ungranted pool */
1230                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
1231                 left -= wanted - fed->fed_grant;
1232                 fed->fed_grant = 0;
1233         }
1234         granted = wanted;
1235         fed->fed_pending += granted;
1236         ofd->ofd_tot_pending += granted;
1237
1238         /* grant more space for precreate purpose if possible. */
1239         wanted = OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
1240         if (wanted > fed->fed_grant) {
1241                 long chunk;
1242
1243                 /* always try to book enough space to handle a large precreate
1244                  * request */
1245                 chunk = ofd_grant_chunk(exp, ofd, NULL);
1246                 wanted -= fed->fed_grant;
1247                 ofd_grant_alloc(exp, fed->fed_grant, wanted, left, chunk,
1248                                 false);
1249         }
1250         spin_unlock(&ofd->ofd_grant_lock);
1251         RETURN(granted);
1252 }
1253
1254 /**
1255  * Release grant space added to the pending counter by ofd_grant_prepare_write()
1256  *
1257  * Update pending grant counter once buffers have been written to the disk.
1258  *
1259  * \param[in] exp       export of the client which sent the request
1260  * \param[in] pending   amount of reserved space to be released
1261  * \param[in] rc        return code of pre-commit operations
1262  */
1263 void ofd_grant_commit(struct obd_export *exp, unsigned long pending,
1264                       int rc)
1265 {
1266         struct ofd_device       *ofd  = ofd_exp(exp);
1267         ENTRY;
1268
1269         /* get space accounted in tot_pending for the I/O, set in
1270          * ofd_grant_check() */
1271         if (pending == 0)
1272                 RETURN_EXIT;
1273
1274         spin_lock(&ofd->ofd_grant_lock);
1275         /* Don't update statfs data for errors raised before commit (e.g.
1276          * bulk transfer failed, ...) since we know those writes have not been
1277          * processed. For other errors hit during commit, we cannot really tell
1278          * whether or not something was written, so we update statfs data.
1279          * In any case, this should not be fatal since we always get fresh
1280          * statfs data before failing a request with ENOSPC */
1281         if (rc == 0) {
1282                 spin_lock(&ofd->ofd_osfs_lock);
1283                 /* Take pending out of cached statfs data */
1284                 ofd->ofd_osfs.os_bavail -= min_t(u64,
1285                                                  ofd->ofd_osfs.os_bavail,
1286                                                  pending >> ofd->ofd_blockbits);
1287                 if (ofd->ofd_statfs_inflight)
1288                         /* someone is running statfs and want to be notified of
1289                          * writes happening meanwhile */
1290                         ofd->ofd_osfs_inflight += pending;
1291                 spin_unlock(&ofd->ofd_osfs_lock);
1292         }
1293
1294         if (exp->exp_filter_data.fed_pending < pending) {
1295                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1296                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1297                        exp->exp_filter_data.fed_pending, pending);
1298                 spin_unlock(&ofd->ofd_grant_lock);
1299                 LBUG();
1300         }
1301         exp->exp_filter_data.fed_pending -= pending;
1302
1303         if (ofd->ofd_tot_granted < pending) {
1304                 CERROR("%s: cli %s/%p tot_granted(%llu) < grant_used(%lu)\n",
1305                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1306                        ofd->ofd_tot_granted, pending);
1307                 spin_unlock(&ofd->ofd_grant_lock);
1308                 LBUG();
1309         }
1310         ofd->ofd_tot_granted -= pending;
1311
1312         if (ofd->ofd_tot_pending < pending) {
1313                 CERROR("%s: cli %s/%p tot_pending(%llu) < grant_used(%lu)\n",
1314                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1315                        ofd->ofd_tot_pending, pending);
1316                 spin_unlock(&ofd->ofd_grant_lock);
1317                 LBUG();
1318         }
1319         ofd->ofd_tot_pending -= pending;
1320         spin_unlock(&ofd->ofd_grant_lock);
1321         EXIT;
1322 }
1323
1324 struct ofd_grant_cb {
1325         /* commit callback structure */
1326         struct dt_txn_commit_cb  ogc_cb;
1327         /* export associated with the bulk write */
1328         struct obd_export       *ogc_exp;
1329         /* pending grant to be released */
1330         unsigned long            ogc_granted;
1331 };
1332
1333 /**
1334  * Callback function for grant releasing
1335  *
1336  * Release grant space reserved by the client node.
1337  *
1338  * \param[in] env       execution environment
1339  * \param[in] th        transaction handle
1340  * \param[in] cb        callback data
1341  * \param[in] err       error code
1342  */
1343 static void ofd_grant_commit_cb(struct lu_env *env, struct thandle *th,
1344                                 struct dt_txn_commit_cb *cb, int err)
1345 {
1346         struct ofd_grant_cb     *ogc;
1347
1348         ogc = container_of(cb, struct ofd_grant_cb, ogc_cb);
1349
1350         ofd_grant_commit(ogc->ogc_exp, ogc->ogc_granted, err);
1351         class_export_cb_put(ogc->ogc_exp);
1352         OBD_FREE_PTR(ogc);
1353 }
1354
1355 /**
1356  * Add callback for grant releasing
1357  *
1358  * Register a commit callback to release grant space.
1359  *
1360  * \param[in] th        transaction handle
1361  * \param[in] exp       OBD export of client
1362  * \param[in] granted   amount of grant space to be released upon commit
1363  *
1364  * \retval              0 on successful callback adding
1365  * \retval              negative value on error
1366  */
1367 int ofd_grant_commit_cb_add(struct thandle *th, struct obd_export *exp,
1368                             unsigned long granted)
1369 {
1370         struct ofd_grant_cb     *ogc;
1371         struct dt_txn_commit_cb *dcb;
1372         int                      rc;
1373         ENTRY;
1374
1375         OBD_ALLOC_PTR(ogc);
1376         if (ogc == NULL)
1377                 RETURN(-ENOMEM);
1378
1379         ogc->ogc_exp = class_export_cb_get(exp);
1380         ogc->ogc_granted = granted;
1381
1382         dcb = &ogc->ogc_cb;
1383         dcb->dcb_func = ofd_grant_commit_cb;
1384         INIT_LIST_HEAD(&dcb->dcb_linkage);
1385         strlcpy(dcb->dcb_name, "ofd_grant_commit_cb", sizeof(dcb->dcb_name));
1386
1387         rc = dt_trans_cb_add(th, dcb);
1388         if (rc) {
1389                 class_export_cb_put(ogc->ogc_exp);
1390                 OBD_FREE_PTR(ogc);
1391         }
1392
1393         RETURN(rc);
1394 }