Whamcloud - gitweb
LU-4993 libcfs: convert nodemask_t to linux bitmask
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_grant.c
37  *
38  * Author: Johann Lombardi <johann@whamcloud..com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include "ofd_internal.h"
44
45 /* At least enough to send a couple of 1MB RPCs, even if not max sized */
46 #define OFD_GRANT_CHUNK                 (2ULL * DT_MAX_BRW_SIZE)
47
48 /* Clients typically hold 2x their max_rpcs_in_flight of grant space */
49 #define OFD_GRANT_SHRINK_LIMIT(exp)     (2ULL * 8 * exp_max_brw_size(exp))
50
51 static inline obd_size ofd_grant_from_cli(struct obd_export *exp,
52                                           struct ofd_device *ofd, obd_size val)
53 {
54         if (ofd_grant_compat(exp, ofd))
55                 /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
56                  * consume 4KB of grant per block, we should thus inflate
57                  * the grant counters to reflect what was actually consumed */
58                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
59         return val;
60 }
61
62 static inline obd_size ofd_grant_to_cli(struct obd_export *exp,
63                                         struct ofd_device *ofd, obd_size val)
64 {
65         if (ofd_grant_compat(exp, ofd))
66                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
67         return val;
68 }
69
70 static inline obd_size ofd_grant_chunk(struct obd_export *exp,
71                                        struct ofd_device *ofd)
72 {
73         if (ofd_obd(ofd)->obd_self_export == exp)
74                 /* Grant enough space to handle a big precreate request */
75                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
76
77         if (ofd_grant_compat(exp, ofd))
78                 /* Try to grant enough space to send a full-size RPC */
79                 return exp_max_brw_size(exp) <<
80                        (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
81
82         /* Try to return enough to send two full RPCs, if needed */
83         return exp_max_brw_size(exp) * 2;
84 }
85
86 /**
87  * Perform extra sanity checks for grant accounting. This is done at connect,
88  * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
89  * always get rid of it or turn it off when we know accounting is good.
90  *
91  * \param obd - is the device to check
92  * \param func - is the function to call if an inconsistency is found
93  */
94 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
95 {
96         struct filter_export_data       *fed;
97         struct ofd_device               *ofd = ofd_dev(obd->obd_lu_dev);
98         struct obd_export               *exp;
99         obd_size                         maxsize;
100         obd_size                         tot_dirty = 0;
101         obd_size                         tot_pending = 0;
102         obd_size                         tot_granted = 0;
103         obd_size                         fo_tot_dirty, fo_tot_pending;
104         obd_size                         fo_tot_granted;
105
106         if (cfs_list_empty(&obd->obd_exports))
107                 return;
108
109         /* We don't want to do this for large machines that do lots of
110          * mounts or unmounts.  It burns... */
111         if (obd->obd_num_exports > 100)
112                 return;
113
114         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
115
116         spin_lock(&obd->obd_dev_lock);
117         spin_lock(&ofd->ofd_grant_lock);
118         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
119                 int error = 0;
120
121                 fed = &exp->exp_filter_data;
122
123                 if (obd->obd_self_export == exp)
124                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
125                                "%ld\n", obd->obd_name, fed->fed_grant,
126                                fed->fed_pending, fed->fed_dirty);
127
128                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
129                     fed->fed_dirty < 0)
130                         error = 1;
131                 if (fed->fed_grant + fed->fed_pending > maxsize) {
132                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
133                                " > maxsize("LPU64")\n", obd->obd_name,
134                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
135                                fed->fed_pending, maxsize);
136                         spin_unlock(&obd->obd_dev_lock);
137                         spin_unlock(&ofd->ofd_grant_lock);
138                         LBUG();
139                 }
140                 if (fed->fed_dirty > maxsize) {
141                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
142                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
143                                exp, fed->fed_dirty, maxsize);
144                         spin_unlock(&obd->obd_dev_lock);
145                         spin_unlock(&ofd->ofd_grant_lock);
146                         LBUG();
147                 }
148                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
149                              "%ld pend %ld grant %ld\n", obd->obd_name,
150                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
151                              fed->fed_pending, fed->fed_grant);
152                 tot_granted += fed->fed_grant + fed->fed_pending;
153                 tot_pending += fed->fed_pending;
154                 tot_dirty += fed->fed_dirty;
155         }
156         spin_unlock(&obd->obd_dev_lock);
157         fo_tot_granted = ofd->ofd_tot_granted;
158         fo_tot_pending = ofd->ofd_tot_pending;
159         fo_tot_dirty = ofd->ofd_tot_dirty;
160
161         if (tot_granted != fo_tot_granted)
162                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
163                        func, tot_granted, fo_tot_granted);
164         if (tot_pending != fo_tot_pending)
165                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
166                        func, tot_pending, fo_tot_pending);
167         if (tot_dirty != fo_tot_dirty)
168                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
169                        func, tot_dirty, fo_tot_dirty);
170         if (tot_pending > tot_granted)
171                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
172                        func, tot_pending, tot_granted);
173         if (tot_granted > maxsize)
174                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
175                        func, tot_granted, maxsize);
176         if (tot_dirty > maxsize)
177                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
178                        func, tot_dirty, maxsize);
179         spin_unlock(&ofd->ofd_grant_lock);
180 }
181
182 /**
183  * Get fresh statfs information from the OSD layer if the cache is older than 1s
184  * or if force is set. The OSD layer is in charge of estimating data & metadata
185  * overhead.
186  *
187  * \param env - is the lu environment passed by the caller
188  * \param exp - export used to print client info in debug messages
189  * \param force - is used to force a refresh of statfs information
190  * \param from_cache - returns whether the statfs information are
191  *                   taken from cache
192  */
193 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
194                              int force, int *from_cache)
195 {
196         struct obd_device       *obd = exp->exp_obd;
197         struct ofd_device       *ofd = ofd_exp(exp);
198         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
199         __u64                    max_age;
200         int                      rc;
201
202         if (force)
203                 max_age = 0; /* get fresh statfs data */
204         else
205                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
206
207         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
208         if (unlikely(rc)) {
209                 if (from_cache)
210                         *from_cache = 0;
211                 return;
212         }
213
214         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
215                obd->obd_name, exp->exp_client_uuid.uuid, exp,
216                osfs->os_bfree << ofd->ofd_blockbits,
217                osfs->os_bavail << ofd->ofd_blockbits);
218 }
219
220 /**
221  * Figure out how much space is available on the backend filesystem.
222  * This is done by accessing cached statfs data previously populated by
223  * ofd_grant_statfs(), from which we withdraw the space already granted to
224  * clients and the reserved space.
225  *
226  * \param exp - export which received the write request
227  */
228 static obd_size ofd_grant_space_left(struct obd_export *exp)
229 {
230         struct obd_device       *obd = exp->exp_obd;
231         struct ofd_device       *ofd = ofd_exp(exp);
232         obd_size                 tot_granted;
233         obd_size                 left, avail;
234         obd_size                 unstable;
235
236         ENTRY;
237         assert_spin_locked(&ofd->ofd_grant_lock);
238
239         spin_lock(&ofd->ofd_osfs_lock);
240         /* get available space from cached statfs data */
241         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
242         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
243         spin_unlock(&ofd->ofd_osfs_lock);
244
245         tot_granted = ofd->ofd_tot_granted;
246
247         if (left < tot_granted) {
248                 int mask = (left + unstable <
249                             tot_granted - ofd->ofd_tot_pending) ?
250                             D_ERROR : D_CACHE;
251
252                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
253                              LPU64" unstable "LPU64" pending "LPU64" "
254                              "dirty "LPU64"\n",
255                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
256                              left, tot_granted, unstable,
257                              ofd->ofd_tot_pending, ofd->ofd_tot_dirty);
258                 RETURN(0);
259         }
260
261         avail = left;
262         /* Withdraw space already granted to clients */
263         left -= tot_granted;
264
265         /* If the left space is below the grant threshold x available space,
266          * stop granting space to clients.
267          * The purpose of this threshold is to keep some error margin on the
268          * overhead estimate made by the OSD layer. If we grant all the free
269          * space, we have no way (grant space cannot be revoked yet) to
270          * adjust if the write overhead has been underestimated. */
271         left -= min_t(obd_size, left, ofd_grant_reserved(ofd, avail));
272
273         /* Align left on block size */
274         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
275
276         CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
277                LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
278                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
279                tot_granted, ofd->ofd_tot_pending);
280
281         RETURN(left);
282 }
283
284 /**
285  * Grab the dirty and seen grant announcements from the incoming obdo.
286  * We will later calculate the client's new grant and return it.
287  * Caller must hold ofd_grant_lock spinlock.
288  *
289  * \param env - is the lu environment supplying osfs storage
290  * \param exp - is the export for which we received the request
291  * \paral oa - is the incoming obdo sent by the client
292  */
293 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
294                                struct obdo *oa)
295 {
296         struct filter_export_data       *fed;
297         struct ofd_device               *ofd = ofd_exp(exp);
298         struct obd_device               *obd = exp->exp_obd;
299         long                             dirty, dropped, grant_chunk;
300         ENTRY;
301
302         assert_spin_locked(&ofd->ofd_grant_lock);
303
304         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
305                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
306                 oa->o_valid &= ~OBD_MD_FLGRANT;
307                 RETURN_EXIT;
308         }
309
310         fed = &exp->exp_filter_data;
311
312         /* Add some margin, since there is a small race if other RPCs arrive
313          * out-or-order and have already consumed some grant.  We want to
314          * leave this here in case there is a large error in accounting. */
315         CDEBUG(D_CACHE,
316                "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
317                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
318                oa->o_dropped, fed->fed_grant);
319
320         if ((long long)oa->o_dirty < 0)
321                 oa->o_dirty = 0;
322
323         dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
324         dropped     = ofd_grant_from_cli(exp, ofd, (obd_size)oa->o_dropped);
325         grant_chunk = ofd_grant_chunk(exp, ofd);
326
327         /* Update our accounting now so that statfs takes it into account.
328          * Note that fed_dirty is only approximate and can become incorrect
329          * if RPCs arrive out-of-order.  No important calculations depend
330          * on fed_dirty however, but we must check sanity to not assert. */
331         if (dirty > fed->fed_grant + 4 * grant_chunk)
332                 dirty = fed->fed_grant + 4 * grant_chunk;
333         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
334         if (fed->fed_grant < dropped) {
335                 CDEBUG(D_CACHE,
336                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
337                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
338                        fed->fed_grant);
339                 dropped = 0;
340         }
341         if (ofd->ofd_tot_granted < dropped) {
342                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
343                        "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
344                        dropped, ofd->ofd_tot_granted);
345                 dropped = 0;
346         }
347         ofd->ofd_tot_granted -= dropped;
348         fed->fed_grant -= dropped;
349         fed->fed_dirty = dirty;
350
351         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
352                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
353                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
354                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
355                 spin_unlock(&ofd->ofd_grant_lock);
356                 LBUG();
357         }
358         EXIT;
359 }
360
361 /**
362  * Called when the client is able to release some grants. Proceed with the
363  * shrink request when there is less ungranted space remaining
364  * than the amount all of the connected clients would consume if they
365  * used their full grant.
366  *
367  * \param exp - is the export for which we received the request
368  * \paral oa - is the incoming obdo sent by the client
369  * \param left_space - is the remaining free space with space already granted
370  *                   taken out
371  */
372 static void ofd_grant_shrink(struct obd_export *exp,
373                              struct obdo *oa, obd_size left_space)
374 {
375         struct filter_export_data       *fed;
376         struct ofd_device               *ofd = ofd_exp(exp);
377         struct obd_device               *obd = exp->exp_obd;
378         long                             grant_shrink;
379
380         assert_spin_locked(&ofd->ofd_grant_lock);
381         LASSERT(exp);
382         if (left_space >= ofd->ofd_tot_granted_clients *
383                           OFD_GRANT_SHRINK_LIMIT(exp))
384                 return;
385
386         grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
387
388         fed = &exp->exp_filter_data;
389         fed->fed_grant       -= grant_shrink;
390         ofd->ofd_tot_granted -= grant_shrink;
391
392         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
393                LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
394                exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
395
396         /* client has just released some grant, don't grant any space back */
397         oa->o_grant = 0;
398 }
399
400 /**
401  * Calculate how much space is required to write a given network buffer
402  */
403 static inline int ofd_grant_rnb_size(struct obd_export *exp,
404                                      struct ofd_device *ofd,
405                                      struct niobuf_remote *rnb)
406 {
407         obd_size blocksize, bytes, end;
408
409         if (exp && ofd_grant_compat(exp, ofd))
410                 blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
411         else
412                 blocksize = 1ULL << ofd->ofd_blockbits;
413
414         /* The network buffer might span several blocks, align it on block
415          * boundaries */
416         bytes  = rnb->rnb_offset & (blocksize - 1);
417         bytes += rnb->rnb_len;
418         end    = bytes & (blocksize - 1);
419         if (end)
420                 bytes += blocksize - end;
421         if (exp)
422                 /* Apply per-export pecularities if one is given */
423                 bytes = ofd_grant_from_cli(exp, ofd, (obd_size)bytes);
424         return bytes;
425 }
426
427
428 /**
429  * When clients have dirtied as much space as they've been granted they
430  * fall through to sync writes.  These sync writes haven't been expressed
431  * in grants and need to error with ENOSPC when there isn't room in the
432  * filesystem for them after grants are taken into account.  However,
433  * writeback of the dirty data that was already granted space can write
434  * right on through.
435  * Caller must hold ofd_grant_lock spinlock.
436  *
437  * \param env - is the lu environment passed by the caller
438  * \param exp - is the export identifying the client which sent the RPC
439  * \param oa  - is the incoming obdo in which we should return the pack the
440  *            additional grant
441  * \param rnb - is the list of network buffers
442  * \param niocont - is the number of network buffers in the list
443  * \param left - is the remaining free space with space already granted
444  *             taken out
445  */
446 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
447                             struct obdo *oa, struct niobuf_remote *rnb,
448                             int niocount, obd_size *left)
449 {
450         struct filter_export_data       *fed = &exp->exp_filter_data;
451         struct obd_device               *obd = exp->exp_obd;
452         struct ofd_device               *ofd = ofd_exp(exp);
453         unsigned long                    ungranted = 0;
454         unsigned long                    granted = 0;
455         int                              i;
456         int                              resend = 0;
457         struct ofd_thread_info          *info = ofd_info(env);
458
459         ENTRY;
460
461         assert_spin_locked(&ofd->ofd_grant_lock);
462
463         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464             (oa->o_flags & OBD_FL_RECOV_RESEND)) {
465                 resend = 1;
466                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
467                                 "accounting\n");
468         }
469
470         for (i = 0; i < niocount; i++) {
471                 int bytes;
472
473                 if (obd->obd_recovering) {
474                         /* Replaying write. Grant info have been processed
475                          * already so no need to do any enforcement here.
476                          * It is worth noting that only bulk writes with all
477                          * rnbs having OBD_BRW_FROM_GRANT can be replayed.
478                          * If one page hasn't OBD_BRW_FROM_GRANT set, then
479                          * the whole bulk is written synchronously */
480                         if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
481                                  rnb[i].rnb_flags |= OBD_BRW_GRANTED;
482                                  continue;
483                         } else {
484                                 CERROR("%s: cli %s is replaying OST_WRITE "
485                                        "while one rnb hasn't OBD_BRW_FROM_GRANT"
486                                        " set (0x%x)\n", exp->exp_obd->obd_name,
487                                         exp->exp_client_uuid.uuid,
488                                         rnb[i].rnb_flags);
489
490                         }
491                 } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
492                            (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
493                         if (resend) {
494                                 /* This is a recoverable resend so grant
495                                  * information have already been processed */
496                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
497                                 continue;
498                         }
499
500                         /* inflate consumed space if needed */
501                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
502                         if (fed->fed_grant < granted + bytes) {
503                                 CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
504                                        "GRANT, real grant %lu idx %d\n",
505                                        exp->exp_obd->obd_name,
506                                        exp->exp_client_uuid.uuid, exp,
507                                        granted, bytes, fed->fed_grant, i);
508                         } else {
509                                 granted += bytes;
510                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
511                                 continue;
512                         }
513                 }
514
515                 /* Consume grant space on the server.
516                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
517                  * so that the required grant space isn't inflated. This is
518                  * done on purpose since the server can deal with large block
519                  * size, unlike some clients */
520                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
521                 if (*left > ungranted + bytes) {
522                         /* if enough space, pretend it was granted */
523                         ungranted += bytes;
524                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
525                         continue;
526                 }
527
528                 /* We can't check for already-mapped blocks here (make sense
529                  * when backend filesystem does not use COW) as it requires
530                  * dropping the grant lock.
531                  * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
532                  * to go through and verify if all of the blocks not marked
533                  *  BRW_GRANTED are already mapped and we can ignore this error.
534                  */
535                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
536                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
537                                 exp->exp_obd->obd_name,
538                                 exp->exp_client_uuid.uuid, exp, i, bytes);
539         }
540
541         /* record space used for the I/O, will be used in ofd_grant_commmit() */
542         /* Now substract what the clients has used already.  We don't subtract
543          * this from the tot_granted yet, so that other client's can't grab
544          * that space before we have actually allocated our blocks. That
545          * happens in ofd_grant_commit() after the writes are done. */
546         info->fti_used = granted + ungranted;
547         *left -= ungranted;
548         fed->fed_grant -= granted;
549         fed->fed_pending += info->fti_used;
550         ofd->ofd_tot_granted += ungranted;
551         ofd->ofd_tot_pending += info->fti_used;
552
553         CDEBUG(D_CACHE,
554                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
555                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
556                granted, ungranted, fed->fed_grant, fed->fed_dirty);
557
558         if (obd->obd_recovering)
559                 /* don't update dirty accounting during recovery */
560                 RETURN_EXIT;
561
562         if (fed->fed_dirty < granted) {
563                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
564                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
565                        granted, fed->fed_dirty);
566                 granted = fed->fed_dirty;
567         }
568         ofd->ofd_tot_dirty -= granted;
569         fed->fed_dirty -= granted;
570
571         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
572                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
573                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
574                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
575                 spin_unlock(&ofd->ofd_grant_lock);
576                 LBUG();
577         }
578         EXIT;
579 }
580
581 /**
582  * Calculate how much grant space to return to client, based on how much space
583  * is currently free and how much of that is already granted.
584  * Caller must hold ofd_grant_lock spinlock.
585  *
586  * \param exp - is the export of the client which sent the request
587  * \param curgrant - is the current grant claimed by the client
588  * \param want - is how much grant space the client would like to have
589  * \param left - is the remaining free space with granted space taken out
590  * \param conservative - is how server grants, if true, a certain amount, else
591  *        server will grant as client requested.
592  */
593 static long ofd_grant(struct obd_export *exp, obd_size curgrant,
594                       obd_size want, obd_size left, bool conservative)
595 {
596         struct obd_device               *obd = exp->exp_obd;
597         struct ofd_device               *ofd = ofd_exp(exp);
598         struct filter_export_data       *fed = &exp->exp_filter_data;
599         long                             grant_chunk;
600         obd_size                         grant;
601
602         ENTRY;
603
604         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
605                 RETURN(0);
606
607         if (want > 0x7fffffff) {
608                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
609                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
610                 RETURN(0);
611         }
612
613         /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
614          * size while the reality is different */
615         curgrant = ofd_grant_from_cli(exp, ofd, curgrant);
616         want = ofd_grant_from_cli(exp, ofd, want);
617         grant_chunk = ofd_grant_chunk(exp, ofd);
618
619         /* Grant some fraction of the client's requested grant space so that
620          * they are not always waiting for write credits (not all of it to
621          * avoid overgranting in face of multiple RPCs in flight).  This
622          * essentially will be able to control the OSC_MAX_RIF for a client.
623          *
624          * If we do have a large disparity between what the client thinks it
625          * has and what we think it has, don't grant very much and let the
626          * client consume its grant first.  Either it just has lots of RPCs
627          * in flight, or it was evicted and its grants will soon be used up. */
628         if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
629                    RETURN(0);
630
631         if (obd->obd_recovering)
632                 conservative = false;
633
634         if (conservative)
635                 /* don't grant more than 1/8th of the remaining free space in
636                  * one chunk */
637                 left >>= 3;
638         grant = min(want, left);
639         /* round grant upt to the next block size */
640         grant = (grant + (1 << ofd->ofd_blockbits) - 1) &
641                 ~((1ULL << ofd->ofd_blockbits) - 1);
642
643         if (!grant)
644                 RETURN(0);
645
646         /* Limit to ofd_grant_chunk() if not reconnect/recovery */
647         if ((grant > grant_chunk) && conservative)
648                 grant = grant_chunk;
649
650         ofd->ofd_tot_granted += grant;
651         fed->fed_grant += grant;
652
653         if (fed->fed_grant < 0) {
654                 CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
655                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
656                        fed->fed_grant, want, curgrant);
657                 spin_unlock(&ofd->ofd_grant_lock);
658                 LBUG();
659         }
660
661         CDEBUG(D_CACHE,
662                "%s: cli %s/%p wants: "LPU64" current grant "LPU64
663                " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
664                exp, want, curgrant, grant);
665         CDEBUG(D_CACHE,
666                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
667                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
668                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
669                obd->obd_num_exports);
670
671         RETURN(ofd_grant_to_cli(exp, ofd, grant));
672 }
673
674 /**
675  * Client connection or reconnection.
676  *
677  * \param env - is the lu environment provided by the caller
678  * \param exp - is the client's export which is reconnecting
679  * \param want - is how much the client would like to get
680  * \param conservative - is how server grants to client, if true server will
681  *        only grant certain amount, else server will grant client requested
682  *        amount.
683  */
684 long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
685                        obd_size want, bool conservative)
686 {
687         struct ofd_device               *ofd = ofd_exp(exp);
688         struct filter_export_data       *fed = &exp->exp_filter_data;
689         obd_size                         left = 0;
690         long                             grant;
691         int                              from_cache;
692         int                              force = 0; /* can use cached data */
693
694         /* don't grant space to client with read-only access */
695         if ((exp_connect_flags(exp) & OBD_CONNECT_RDONLY) ||
696             ofd_grant_prohibit(exp, ofd))
697                 return 0;
698
699 refresh:
700         ofd_grant_statfs(env, exp, force, &from_cache);
701
702         spin_lock(&ofd->ofd_grant_lock);
703
704         /* Grab free space from cached info and take out space already granted
705          * to clients as well as reserved space */
706         left = ofd_grant_space_left(exp);
707
708         /* get fresh statfs data if we are short in ungranted space */
709         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
710                 spin_unlock(&ofd->ofd_grant_lock);
711                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
712                 force = 1;
713                 goto refresh;
714         }
715
716         ofd_grant(exp, ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant),
717                   want, left, conservative);
718
719         /* return to client its current grant */
720         grant = ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant);
721         ofd->ofd_tot_granted_clients++;
722
723         spin_unlock(&ofd->ofd_grant_lock);
724
725         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
726                LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
727                exp, grant, want, left);
728
729         return grant;
730 }
731
732 /**
733  * Remove a client from the grant accounting totals.  We also remove
734  * the export from the obd device under the osfs and dev locks to ensure
735  * that the ofd_grant_sanity_check() calculations are always valid.
736  * The client should do something similar when it invalidates its import.
737  *
738  * \param exp - is the client's export to remove from grant accounting
739  */
740 void ofd_grant_discard(struct obd_export *exp)
741 {
742         struct obd_device               *obd = exp->exp_obd;
743         struct ofd_device               *ofd = ofd_exp(exp);
744         struct filter_export_data       *fed = &exp->exp_filter_data;
745
746         spin_lock(&ofd->ofd_grant_lock);
747         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
748                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
749                  obd->obd_name, ofd->ofd_tot_granted,
750                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
751         ofd->ofd_tot_granted -= fed->fed_grant;
752         fed->fed_grant = 0;
753         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
754                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
755                  obd->obd_name, ofd->ofd_tot_pending,
756                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
757         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
758          * finishes */
759         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
760                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
761                  obd->obd_name, ofd->ofd_tot_dirty,
762                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
763         ofd->ofd_tot_dirty -= fed->fed_dirty;
764         fed->fed_dirty = 0;
765         spin_unlock(&ofd->ofd_grant_lock);
766 }
767
768 /**
769  * Called at prepare time when handling read request. This function extracts
770  * incoming grant information from the obdo and processes the grant shrink
771  * request, if any.
772  *
773  * \param env - is the lu environment provided by the caller
774  * \param exp - is the export of the client which sent the request
775  * \paral oa - is the incoming obdo sent by the client
776  */
777 void ofd_grant_prepare_read(const struct lu_env *env,
778                             struct obd_export *exp, struct obdo *oa)
779 {
780         struct ofd_device       *ofd = ofd_exp(exp);
781         int                      do_shrink;
782         obd_size                 left = 0;
783
784         if (!oa)
785                 return;
786
787         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
788                 /* The read request does not contain any grant
789                  * information */
790                 return;
791
792         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
793             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
794                 /* To process grant shrink request, we need to know how much
795                  * available space remains on the backend filesystem.
796                  * Shrink requests are not so common, we always get fresh
797                  * statfs information. */
798                 ofd_grant_statfs(env, exp, 1, NULL);
799
800                 /* protect all grant counters */
801                 spin_lock(&ofd->ofd_grant_lock);
802
803                 /* Grab free space from cached statfs data and take out space
804                  * already granted to clients as well as reserved space */
805                 left = ofd_grant_space_left(exp);
806
807                 /* all set now to proceed with shrinking */
808                 do_shrink = 1;
809         } else {
810                 /* no grant shrinking request packed in the obdo and
811                  * since we don't grant space back on reads, no point
812                  * in running statfs, so just skip it and process
813                  * incoming grant data directly. */
814                 spin_lock(&ofd->ofd_grant_lock);
815                 do_shrink = 0;
816         }
817
818         /* extract incoming grant infomation provided by the client */
819         ofd_grant_incoming(env, exp, oa);
820
821         /* unlike writes, we don't return grants back on reads unless a grant
822          * shrink request was packed and we decided to turn it down. */
823         if (do_shrink)
824                 ofd_grant_shrink(exp, oa, left);
825         else
826                 oa->o_grant = 0;
827
828         spin_unlock(&ofd->ofd_grant_lock);
829 }
830
831 /**
832  * Called at write prepare time to handle incoming grant, check that we have
833  * enough space and grant some space back to the client if possible.
834  *
835  * \param env - is the lu environment provided by the caller
836  * \param exp - is the export of the client which sent the request
837  * \paral oa - is the incoming obdo sent by the client
838  * \param rnb - is the list of network buffers
839  * \param niocont - is the number of network buffers in the list
840  */
841 void ofd_grant_prepare_write(const struct lu_env *env,
842                              struct obd_export *exp, struct obdo *oa,
843                              struct niobuf_remote *rnb, int niocount)
844 {
845         struct obd_device       *obd = exp->exp_obd;
846         struct ofd_device       *ofd = ofd_exp(exp);
847         obd_size                 left;
848         int                      from_cache;
849         int                      force = 0; /* can use cached data intially */
850         int                      rc;
851
852         ENTRY;
853
854 refresh:
855         /* get statfs information from OSD layer */
856         ofd_grant_statfs(env, exp, force, &from_cache);
857
858         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
859
860         /* Grab free space from cached statfs data and take out space already
861          * granted to clients as well as reserved space */
862         left = ofd_grant_space_left(exp);
863
864         /* Get fresh statfs data if we are short in ungranted space */
865         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
866                 spin_unlock(&ofd->ofd_grant_lock);
867                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
868                        obd->obd_name);
869                 force = 1;
870                 goto refresh;
871         }
872
873         /* When close to free space exhaustion, trigger a sync to force
874          * writeback cache to consume required space immediately and release as
875          * much space as possible. */
876         if (!obd->obd_recovering && force != 2 && left < OFD_GRANT_CHUNK) {
877                 bool from_grant = true;
878                 int  i;
879
880                 /* That said, it is worth running a sync only if some pages did
881                  * not consume grant space on the client and could thus fail
882                  * with ENOSPC later in ofd_grant_check() */
883                 for (i = 0; i < niocount; i++)
884                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
885                                 from_grant = false;
886
887                 if (!from_grant) {
888                         /* at least one network buffer requires acquiring grant
889                          * space on the server */
890                         spin_unlock(&ofd->ofd_grant_lock);
891                         /* discard errors, at least we tried ... */
892                         rc = dt_sync(env, ofd->ofd_osd);
893                         force = 2;
894                         goto refresh;
895                 }
896         }
897
898         /* extract incoming grant information provided by the client */
899         ofd_grant_incoming(env, exp, oa);
900
901         /* check limit */
902         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
903
904         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
905                 spin_unlock(&ofd->ofd_grant_lock);
906                 RETURN_EXIT;
907         }
908
909         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
910          * grant space. */
911         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
912             (oa->o_flags & OBD_FL_SHRINK_GRANT))
913                 ofd_grant_shrink(exp, oa, left);
914         else
915                 /* grant more space back to the client if possible */
916                 oa->o_grant = ofd_grant(exp, oa->o_grant, oa->o_undirty, left,
917                                         true);
918         spin_unlock(&ofd->ofd_grant_lock);
919 }
920
921 /**
922  * Called during object precreation to consume grant space.
923  * More space is granted for precreation if possible.
924  *
925  * \param env - is the lu environment provided by the caller
926  * \param exp - is the export holding the grant space for precreation (= self
927  *            export currently)
928  * \paral nr - is the number of objects the caller wants to create objects
929  */
930 int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
931 {
932         struct ofd_thread_info          *info = ofd_info(env);
933         struct ofd_device               *ofd = ofd_exp(exp);
934         struct filter_export_data       *fed = &exp->exp_filter_data;
935         obd_size                         left = 0;
936         unsigned long                    wanted;
937         ENTRY;
938
939         info->fti_used = 0;
940
941         if (exp->exp_obd->obd_recovering ||
942             ofd->ofd_dt_conf.ddp_inodespace == 0)
943                 /* don't enforce grant during recovery */
944                 RETURN(0);
945
946         /* Update statfs data if required */
947         ofd_grant_statfs(env, exp, 1, NULL);
948
949         /* protect all grant counters */
950         spin_lock(&ofd->ofd_grant_lock);
951
952         /* fail precreate request if there is not enough blocks available for
953          * writing */
954         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
955             (ofd->ofd_osfs.os_blocks >> 10)) {
956                 spin_unlock(&ofd->ofd_grant_lock);
957                 CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
958                        ofd_name(ofd),
959                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
960                 RETURN(-ENOSPC);
961         }
962
963         /* Grab free space from cached statfs data and take out space
964          * already granted to clients as well as reserved space */
965         left = ofd_grant_space_left(exp);
966
967         /* compute how much space is required to handle the precreation
968          * request */
969         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
970         if (wanted > fed->fed_grant + left) {
971                 /* that's beyond what remains, adjust the number of objects that
972                  * can be safely precreated */
973                 wanted = fed->fed_grant + left;
974                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
975                 if (*nr == 0) {
976                         /* we really have no space any more for precreation,
977                          * fail the precreate request with ENOSPC */
978                         spin_unlock(&ofd->ofd_grant_lock);
979                         RETURN(-ENOSPC);
980                 }
981                 /* compute space needed for the new number of creations */
982                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
983         }
984         LASSERT(wanted <= fed->fed_grant + left);
985
986         if (wanted <= fed->fed_grant) {
987                 /* we've enough grant space to handle this precreate request */
988                 fed->fed_grant -= wanted;
989         } else {
990                 /* we need to take some space from the ungranted pool */
991                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
992                 left -= wanted - fed->fed_grant;
993                 fed->fed_grant = 0;
994         }
995         info->fti_used = wanted;
996         fed->fed_pending += info->fti_used;
997         ofd->ofd_tot_pending += info->fti_used;
998
999         /* grant more space for precreate purpose if possible. */
1000         wanted = OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
1001         if (wanted > fed->fed_grant) {
1002                 /* always try to book enough space to handle a large precreate
1003                  * request */
1004                 wanted -= fed->fed_grant;
1005                 ofd_grant(exp, fed->fed_grant, wanted, left, false);
1006         }
1007         spin_unlock(&ofd->ofd_grant_lock);
1008         RETURN(0);
1009 }
1010
1011 /**
1012  * Called at commit time to update pending grant counter for writes in flight
1013  *
1014  * \param env - is the lu environment provided by the caller
1015  * \param exp - is the export of the client which sent the request
1016  */
1017 void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp,
1018                       int rc)
1019 {
1020         struct ofd_device       *ofd  = ofd_exp(exp);
1021         struct ofd_thread_info  *info = ofd_info(env);
1022         unsigned long            pending;
1023
1024         ENTRY;
1025
1026         /* get space accounted in tot_pending for the I/O, set in
1027          * ofd_grant_check() */
1028         pending = info->fti_used;
1029         if (pending == 0)
1030                 RETURN_EXIT;
1031
1032         spin_lock(&ofd->ofd_grant_lock);
1033         /* Don't update statfs data for errors raised before commit (e.g.
1034          * bulk transfer failed, ...) since we know those writes have not been
1035          * processed. For other errors hit during commit, we cannot really tell
1036          * whether or not something was written, so we update statfs data.
1037          * In any case, this should not be fatal since we always get fresh
1038          * statfs data before failing a request with ENOSPC */
1039         if (rc == 0) {
1040                 spin_lock(&ofd->ofd_osfs_lock);
1041                 /* Take pending out of cached statfs data */
1042                 ofd->ofd_osfs.os_bavail -= min_t(obd_size,
1043                                                  ofd->ofd_osfs.os_bavail,
1044                                                  pending >> ofd->ofd_blockbits);
1045                 if (ofd->ofd_statfs_inflight)
1046                         /* someone is running statfs and want to be notified of
1047                          * writes happening meanwhile */
1048                         ofd->ofd_osfs_inflight += pending;
1049                 spin_unlock(&ofd->ofd_osfs_lock);
1050         }
1051
1052         if (exp->exp_filter_data.fed_pending < pending) {
1053                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1054                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1055                        exp->exp_filter_data.fed_pending, pending);
1056                 spin_unlock(&ofd->ofd_grant_lock);
1057                 LBUG();
1058         }
1059         exp->exp_filter_data.fed_pending -= pending;
1060
1061         if (ofd->ofd_tot_granted < pending) {
1062                  CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
1063                         "\n", exp->exp_obd->obd_name,
1064                         exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
1065                         pending);
1066                 spin_unlock(&ofd->ofd_grant_lock);
1067                 LBUG();
1068         }
1069         ofd->ofd_tot_granted -= pending;
1070
1071         if (ofd->ofd_tot_pending < pending) {
1072                  CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
1073                         "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1074                         exp, ofd->ofd_tot_pending, pending);
1075                 spin_unlock(&ofd->ofd_grant_lock);
1076                 LBUG();
1077         }
1078         ofd->ofd_tot_pending -= pending;
1079         spin_unlock(&ofd->ofd_grant_lock);
1080         EXIT;
1081 }