Whamcloud - gitweb
LU-1866 osd: ancillary work for initial OI scrub
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_grant.c
37  *
38  * Author: Johann Lombardi <johann@whamcloud..com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include "ofd_internal.h"
44
45 #define OFD_GRANT_CHUNK (2ULL * DT_MAX_BRW_SIZE)
46 #define OFD_GRANT_CHUNK_EXP(rexp) (2ULL * exp_brw_size((rexp)))
47 #define OFD_GRANT_SHRINK_LIMIT(rexp) (16ULL * OFD_GRANT_CHUNK_EXP((rexp)))
48
49 static inline obd_size ofd_grant_from_cli(struct obd_export *exp,
50                                           struct ofd_device *ofd, obd_size val)
51 {
52         if (ofd_grant_compat(exp, ofd))
53                 /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
54                  * consume 4KB of grant per block, we should thus inflate
55                  * the grant counters to reflect what was actually consumed */
56                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
57         return val;
58 }
59
60 static inline obd_size ofd_grant_to_cli(struct obd_export *exp,
61                                         struct ofd_device *ofd, obd_size val)
62 {
63         if (ofd_grant_compat(exp, ofd))
64                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
65         return val;
66 }
67
68 static inline obd_size ofd_grant_chunk(struct obd_export *exp,
69                                        struct ofd_device *ofd)
70 {
71         if (exp && ofd_obd(ofd)->obd_self_export == exp)
72                 /* Grant enough space to handle a big precreate request */
73                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace;
74
75         if (exp && ofd_grant_compat(exp, ofd))
76                 /* Try to grant enough space to send a full-size RPC */
77                 return exp_brw_size(exp) <<
78                        (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
79         return OFD_GRANT_CHUNK;
80 }
81
82 /**
83  * Perform extra sanity checks for grant accounting. This is done at connect,
84  * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
85  * always get rid of it or turn it off when we know accounting is good.
86  *
87  * \param obd - is the device to check
88  * \param func - is the function to call if an inconsistency is found
89  */
90 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
91 {
92         struct filter_export_data       *fed;
93         struct ofd_device               *ofd = ofd_dev(obd->obd_lu_dev);
94         struct obd_export               *exp;
95         obd_size                         maxsize;
96         obd_size                         tot_dirty = 0;
97         obd_size                         tot_pending = 0;
98         obd_size                         tot_granted = 0;
99         obd_size                         fo_tot_dirty, fo_tot_pending;
100         obd_size                         fo_tot_granted;
101
102         if (cfs_list_empty(&obd->obd_exports))
103                 return;
104
105         /* We don't want to do this for large machines that do lots of
106          * mounts or unmounts.  It burns... */
107         if (obd->obd_num_exports > 100)
108                 return;
109
110         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
111
112         spin_lock(&obd->obd_dev_lock);
113         spin_lock(&ofd->ofd_grant_lock);
114         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
115                 int error = 0;
116
117                 fed = &exp->exp_filter_data;
118
119                 if (obd->obd_self_export == exp)
120                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
121                                "%ld\n", obd->obd_name, fed->fed_grant,
122                                fed->fed_pending, fed->fed_dirty);
123
124                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
125                     fed->fed_dirty < 0)
126                         error = 1;
127                 if (fed->fed_grant + fed->fed_pending > maxsize) {
128                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
129                                " > maxsize("LPU64")\n", obd->obd_name,
130                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
131                                fed->fed_pending, maxsize);
132                         spin_unlock(&obd->obd_dev_lock);
133                         spin_unlock(&ofd->ofd_grant_lock);
134                         LBUG();
135                 }
136                 if (fed->fed_dirty > maxsize) {
137                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
138                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
139                                exp, fed->fed_dirty, maxsize);
140                         spin_unlock(&obd->obd_dev_lock);
141                         spin_unlock(&ofd->ofd_grant_lock);
142                         LBUG();
143                 }
144                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
145                              "%ld pend %ld grant %ld\n", obd->obd_name,
146                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
147                              fed->fed_pending, fed->fed_grant);
148                 tot_granted += fed->fed_grant + fed->fed_pending;
149                 tot_pending += fed->fed_pending;
150                 tot_dirty += fed->fed_dirty;
151         }
152         spin_unlock(&obd->obd_dev_lock);
153         fo_tot_granted = ofd->ofd_tot_granted;
154         fo_tot_pending = ofd->ofd_tot_pending;
155         fo_tot_dirty = ofd->ofd_tot_dirty;
156
157         if (tot_granted != fo_tot_granted)
158                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
159                        func, tot_granted, fo_tot_granted);
160         if (tot_pending != fo_tot_pending)
161                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
162                        func, tot_pending, fo_tot_pending);
163         if (tot_dirty != fo_tot_dirty)
164                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
165                        func, tot_dirty, fo_tot_dirty);
166         if (tot_pending > tot_granted)
167                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
168                        func, tot_pending, tot_granted);
169         if (tot_granted > maxsize)
170                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
171                        func, tot_granted, maxsize);
172         if (tot_dirty > maxsize)
173                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
174                        func, tot_dirty, maxsize);
175         spin_unlock(&ofd->ofd_grant_lock);
176 }
177
178 /**
179  * Get fresh statfs information from the OSD layer if the cache is older than 1s
180  * or if force is set. The OSD layer is in charge of estimating data & metadata
181  * overhead.
182  *
183  * \param env - is the lu environment passed by the caller
184  * \param exp - export used to print client info in debug messages
185  * \param force - is used to force a refresh of statfs information
186  * \param from_cache - returns whether the statfs information are
187  *                   taken from cache
188  */
189 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
190                              int force, int *from_cache)
191 {
192         struct obd_device       *obd = exp->exp_obd;
193         struct ofd_device       *ofd = ofd_exp(exp);
194         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
195         __u64                    max_age;
196         int                      rc;
197
198         if (force)
199                 max_age = 0; /* get fresh statfs data */
200         else
201                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
202
203         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
204         if (unlikely(rc)) {
205                 *from_cache = 0;
206                 return;
207         }
208
209         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
210                obd->obd_name, exp->exp_client_uuid.uuid, exp,
211                osfs->os_bfree << ofd->ofd_blockbits,
212                osfs->os_bavail << ofd->ofd_blockbits);
213 }
214
215 /**
216  * Figure out how much space is available on the backend filesystem.
217  * This is done by accessing cached statfs data previously populated by
218  * ofd_grant_statfs(), from which we withdraw the space already granted to
219  * clients and the reserved space.
220  *
221  * \param exp - export which received the write request
222  */
223 static obd_size ofd_grant_space_left(struct obd_export *exp)
224 {
225         struct obd_device       *obd = exp->exp_obd;
226         struct ofd_device       *ofd = ofd_exp(exp);
227         obd_size                 tot_granted;
228         obd_size                 left, avail;
229         obd_size                 unstable;
230
231         ENTRY;
232         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
233
234         spin_lock(&ofd->ofd_osfs_lock);
235         /* get available space from cached statfs data */
236         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
237         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
238         spin_unlock(&ofd->ofd_osfs_lock);
239
240         tot_granted = ofd->ofd_tot_granted;
241
242         if (left < tot_granted) {
243                 int mask = (left + unstable <
244                             tot_granted - ofd->ofd_tot_pending) ?
245                             D_ERROR : D_CACHE;
246
247                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
248                              LPU64" unstable "LPU64" pending "LPU64"\n",
249                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
250                              left, tot_granted, unstable,
251                              ofd->ofd_tot_pending);
252                 RETURN(0);
253         }
254
255         avail = left;
256         /* Withdraw space already granted to clients */
257         left -= tot_granted;
258
259         /* If the left space is below the grant threshold x available space,
260          * stop granting space to clients.
261          * The purpose of this threshold is to keep some error margin on the
262          * overhead estimate made by the OSD layer. If we grant all the free
263          * space, we have no way (grant space cannot be revoked yet) to
264          * adjust if the write overhead has been underestimated. */
265         left -= min_t(obd_size, left, ofd_grant_reserved(ofd, avail));
266
267         /* Align left on block size */
268         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
269
270         CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
271                LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
272                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
273                tot_granted, ofd->ofd_tot_pending);
274
275         RETURN(left);
276 }
277
278 /**
279  * Grab the dirty and seen grant announcements from the incoming obdo.
280  * We will later calculate the client's new grant and return it.
281  * Caller must hold ofd_grant_lock spinlock.
282  *
283  * \param env - is the lu environment supplying osfs storage
284  * \param exp - is the export for which we received the request
285  * \paral oa - is the incoming obdo sent by the client
286  */
287 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
288                                struct obdo *oa)
289 {
290         struct filter_export_data       *fed;
291         struct ofd_device               *ofd = ofd_exp(exp);
292         struct obd_device               *obd = exp->exp_obd;
293         long                             dirty, dropped, grant_chunk;
294         ENTRY;
295
296         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
297
298         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
299                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
300                 oa->o_valid &= ~OBD_MD_FLGRANT;
301                 RETURN_EXIT;
302         }
303
304         fed = &exp->exp_filter_data;
305
306         /* Add some margin, since there is a small race if other RPCs arrive
307          * out-or-order and have already consumed some grant.  We want to
308          * leave this here in case there is a large error in accounting. */
309         CDEBUG(D_CACHE,
310                "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
311                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
312                oa->o_dropped, fed->fed_grant);
313
314         if ((long long)oa->o_dirty < 0)
315                 oa->o_dirty = 0;
316
317         dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
318         dropped     = ofd_grant_from_cli(exp, ofd, (obd_size)oa->o_dropped);
319         grant_chunk = ofd_grant_chunk(exp, ofd);
320
321         /* Update our accounting now so that statfs takes it into account.
322          * Note that fed_dirty is only approximate and can become incorrect
323          * if RPCs arrive out-of-order.  No important calculations depend
324          * on fed_dirty however, but we must check sanity to not assert. */
325         if (dirty > fed->fed_grant + 4 * grant_chunk)
326                 dirty = fed->fed_grant + 4 * grant_chunk;
327         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
328         if (fed->fed_grant < dropped) {
329                 CDEBUG(D_CACHE,
330                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
331                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
332                        fed->fed_grant);
333                 dropped = 0;
334         }
335         if (ofd->ofd_tot_granted < dropped) {
336                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
337                        "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
338                        dropped, ofd->ofd_tot_granted);
339                 dropped = 0;
340         }
341         ofd->ofd_tot_granted -= dropped;
342         fed->fed_grant -= dropped;
343         fed->fed_dirty = dirty;
344
345         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
346                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
347                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
348                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
349                 spin_unlock(&ofd->ofd_grant_lock);
350                 LBUG();
351         }
352         EXIT;
353 }
354
355 /**
356  * Called when the client is able to release some grants. Proceed with the
357  * shrink request when there is less ungranted space remaining
358  * than the amount all of the connected clients would consume if they
359  * used their full grant.
360  *
361  * \param exp - is the export for which we received the request
362  * \paral oa - is the incoming obdo sent by the client
363  * \param left_space - is the remaining free space with space already granted
364  *                   taken out
365  */
366 static void ofd_grant_shrink(struct obd_export *exp,
367                              struct obdo *oa, obd_size left_space)
368 {
369         struct filter_export_data       *fed;
370         struct ofd_device               *ofd = ofd_exp(exp);
371         struct obd_device               *obd = exp->exp_obd;
372         long                             grant_shrink;
373
374         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
375         LASSERT(exp);
376         if (left_space >= ofd->ofd_tot_granted_clients *
377                           OFD_GRANT_SHRINK_LIMIT(exp))
378                 return;
379
380         grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
381
382         fed = &exp->exp_filter_data;
383         fed->fed_grant       -= grant_shrink;
384         ofd->ofd_tot_granted -= grant_shrink;
385
386         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
387                LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
388                exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
389
390         /* client has just released some grant, don't grant any space back */
391         oa->o_grant = 0;
392 }
393
394 /**
395  * Calculate how much space is required to write a given network buffer
396  */
397 static inline int ofd_grant_rnb_size(struct obd_export *exp,
398                                      struct ofd_device *ofd,
399                                      struct niobuf_remote *rnb)
400 {
401         obd_size blocksize, bytes, end;
402
403         if (exp && ofd_grant_compat(exp, ofd))
404                 blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
405         else
406                 blocksize = 1ULL << ofd->ofd_blockbits;
407
408         /* The network buffer might span several blocks, align it on block
409          * boundaries */
410         bytes  = rnb->rnb_offset & (blocksize - 1);
411         bytes += rnb->rnb_len;
412         end    = bytes & (blocksize - 1);
413         if (end)
414                 bytes += blocksize - end;
415         if (exp)
416                 /* Apply per-export pecularities if one is given */
417                 bytes = ofd_grant_from_cli(exp, ofd, (obd_size)bytes);
418         return bytes;
419 }
420
421
422 /**
423  * When clients have dirtied as much space as they've been granted they
424  * fall through to sync writes.  These sync writes haven't been expressed
425  * in grants and need to error with ENOSPC when there isn't room in the
426  * filesystem for them after grants are taken into account.  However,
427  * writeback of the dirty data that was already granted space can write
428  * right on through.
429  * Caller must hold ofd_grant_lock spinlock.
430  *
431  * \param env - is the lu environment passed by the caller
432  * \param exp - is the export identifying the client which sent the RPC
433  * \param oa  - is the incoming obdo in which we should return the pack the
434  *            additional grant
435  * \param rnb - is the list of network buffers
436  * \param niocont - is the number of network buffers in the list
437  * \param left - is the remaining free space with space already granted
438  *             taken out
439  */
440 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
441                             struct obdo *oa, struct niobuf_remote *rnb,
442                             int niocount, obd_size *left)
443 {
444         struct filter_export_data       *fed = &exp->exp_filter_data;
445         struct obd_device               *obd = exp->exp_obd;
446         struct ofd_device               *ofd = ofd_exp(exp);
447         unsigned long                    ungranted = 0;
448         unsigned long                    granted = 0;
449         int                              i;
450         int                              resend = 0;
451         struct ofd_thread_info          *info = ofd_info(env);
452
453         ENTRY;
454
455         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
456
457         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
458             (oa->o_flags & OBD_FL_RECOV_RESEND)) {
459                 resend = 1;
460                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
461                                 "accounting\n");
462         }
463
464         for (i = 0; i < niocount; i++) {
465                 int bytes;
466
467                 if (obd->obd_recovering) {
468                         /* Replaying write. Grant info have been processed
469                          * already so no need to do any enforcement here.
470                          * It is worth noting that only bulk writes with all
471                          * rnbs having OBD_BRW_FROM_GRANT can be replayed.
472                          * If one page hasn't OBD_BRW_FROM_GRANT set, then
473                          * the whole bulk is written synchronously */
474                         if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
475                                  rnb[i].rnb_flags |= OBD_BRW_GRANTED;
476                                  continue;
477                         } else {
478                                 CERROR("%s: cli %s is replaying OST_WRITE "
479                                        "while one rnb hasn't OBD_BRW_FROM_GRANT"
480                                        " set (0x%x)\n", exp->exp_obd->obd_name,
481                                         exp->exp_client_uuid.uuid,
482                                         rnb[i].rnb_flags);
483
484                         }
485                 } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
486                            (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
487                         if (resend) {
488                                 /* This is a recoverable resend so grant
489                                  * information have already been processed */
490                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
491                                 continue;
492                         }
493
494                         /* inflate consumed space if needed */
495                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
496                         if (fed->fed_grant < granted + bytes) {
497                                 CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
498                                        "GRANT, real grant %lu idx %d\n",
499                                        exp->exp_obd->obd_name,
500                                        exp->exp_client_uuid.uuid, exp,
501                                        granted, bytes, fed->fed_grant, i);
502                         } else {
503                                 granted += bytes;
504                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
505                                 continue;
506                         }
507                 }
508
509                 /* Consume grant space on the server.
510                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
511                  * so that the required grant space isn't inflated. This is
512                  * done on purpose since the server can deal with large block
513                  * size, unlike some clients */
514                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
515                 if (*left > ungranted + bytes) {
516                         /* if enough space, pretend it was granted */
517                         ungranted += bytes;
518                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
519                         continue;
520                 }
521
522                 /* We can't check for already-mapped blocks here (make sense
523                  * when backend filesystem does not use COW) as it requires
524                  * dropping the grant lock.
525                  * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
526                  * to go through and verify if all of the blocks not marked
527                  *  BRW_GRANTED are already mapped and we can ignore this error.
528                  */
529                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
530                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
531                                 exp->exp_obd->obd_name,
532                                 exp->exp_client_uuid.uuid, exp, i, bytes);
533         }
534
535         /* record space used for the I/O, will be used in ofd_grant_commmit() */
536         /* Now substract what the clients has used already.  We don't subtract
537          * this from the tot_granted yet, so that other client's can't grab
538          * that space before we have actually allocated our blocks. That
539          * happens in ofd_grant_commit() after the writes are done. */
540         info->fti_used = granted + ungranted;
541         *left -= ungranted;
542         fed->fed_grant -= granted;
543         fed->fed_pending += info->fti_used;
544         ofd->ofd_tot_granted += ungranted;
545         ofd->ofd_tot_pending += info->fti_used;
546
547         CDEBUG(D_CACHE,
548                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
549                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
550                granted, ungranted, fed->fed_grant, fed->fed_dirty);
551
552         if (obd->obd_recovering)
553                 /* don't update dirty accounting during recovery */
554                 RETURN_EXIT;
555
556         if (fed->fed_dirty < granted) {
557                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
558                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
559                        granted, fed->fed_dirty);
560                 granted = fed->fed_dirty;
561         }
562         ofd->ofd_tot_dirty -= granted;
563         fed->fed_dirty -= granted;
564
565         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
566                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
567                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
568                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
569                 spin_unlock(&ofd->ofd_grant_lock);
570                 LBUG();
571         }
572         EXIT;
573 }
574
575 /**
576  * Calculate how much grant space to return to client, based on how much space
577  * is currently free and how much of that is already granted.
578  * Caller must hold ofd_grant_lock spinlock.
579  *
580  * \param exp - is the export of the client which sent the request
581  * \param curgrant - is the current grant claimed by the client
582  * \param want - is how much grant space the client would like to have
583  * \param left - is the remaining free space with granted space taken out
584  */
585 static long ofd_grant(struct obd_export *exp, obd_size curgrant,
586                       obd_size want, obd_size left)
587 {
588         struct obd_device               *obd = exp->exp_obd;
589         struct ofd_device               *ofd = ofd_exp(exp);
590         struct filter_export_data       *fed = &exp->exp_filter_data;
591         long                             grant_chunk;
592         obd_size                         grant;
593
594         ENTRY;
595
596         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
597                 RETURN(0);
598
599         if (want > 0x7fffffff) {
600                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
601                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
602                 RETURN(0);
603         }
604
605         /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
606          * size while the reality is different */
607         curgrant    = ofd_grant_from_cli(exp, ofd, curgrant);
608         want    = ofd_grant_from_cli(exp, ofd, want);
609         grant_chunk = ofd_grant_chunk(exp, ofd);
610
611         /* Grant some fraction of the client's requested grant space so that
612          * they are not always waiting for write credits (not all of it to
613          * avoid overgranting in face of multiple RPCs in flight).  This
614          * essentially will be able to control the OSC_MAX_RIF for a client.
615          *
616          * If we do have a large disparity between what the client thinks it
617          * has and what we think it has, don't grant very much and let the
618          * client consume its grant first.  Either it just has lots of RPCs
619          * in flight, or it was evicted and its grants will soon be used up. */
620         if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
621                    RETURN(0);
622
623         if (!obd->obd_recovering)
624                 /* don't grant more than 1/8th of the remaining free space in
625                  * one chunk */
626                 left >>= 3;
627         grant = min(want, left);
628         /* align grant on block size */
629         grant &= ~((1ULL << ofd->ofd_blockbits) - 1);
630
631         if (!grant)
632                 RETURN(0);
633
634         /* Allow >OFD_GRANT_CHUNK_EXP size when clients reconnect due to a
635          * server reboot. */
636         if ((grant > grant_chunk) && (!obd->obd_recovering))
637                 grant = grant_chunk;
638
639         ofd->ofd_tot_granted += grant;
640         fed->fed_grant += grant;
641
642         if (fed->fed_grant < 0) {
643                 CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
644                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
645                        fed->fed_grant, want, curgrant);
646                 spin_unlock(&ofd->ofd_grant_lock);
647                 LBUG();
648         }
649
650         CDEBUG(D_CACHE,
651                "%s: cli %s/%p wants: "LPU64" current grant "LPU64
652                " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
653                exp, want, curgrant, grant);
654         CDEBUG(D_CACHE,
655                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
656                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
657                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
658                obd->obd_num_exports);
659
660         RETURN(ofd_grant_to_cli(exp, ofd, grant));
661 }
662
663 /**
664  * Client connection or reconnection.
665  *
666  * \param env - is the lu environment provided by the caller
667  * \param exp - is the client's export which is reconnecting
668  * \param want - is how much the client would like to get
669  */
670 long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
671                        obd_size want)
672 {
673         struct ofd_device               *ofd = ofd_exp(exp);
674         struct filter_export_data       *fed = &exp->exp_filter_data;
675         obd_size                         left = 0;
676         long                             grant;
677         int                              from_cache;
678         int                              force = 0; /* can use cached data */
679
680         /* don't grant space to client with read-only access */
681         if ((exp_connect_flags(exp) & OBD_CONNECT_RDONLY) ||
682             ofd_grant_prohibit(exp, ofd))
683                 return 0;
684
685 refresh:
686         ofd_grant_statfs(env, exp, force, &from_cache);
687
688         spin_lock(&ofd->ofd_grant_lock);
689
690         /* Grab free space from cached info and take out space already granted
691          * to clients as well as reserved space */
692         left = ofd_grant_space_left(exp);
693
694         /* get fresh statfs data if we are short in ungranted space */
695         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
696                 spin_unlock(&ofd->ofd_grant_lock);
697                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
698                 force = 1;
699                 goto refresh;
700         }
701
702         ofd_grant(exp, ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant),
703                   want, left);
704
705         /* return to client its current grant */
706         grant = ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant);
707         ofd->ofd_tot_granted_clients++;
708
709         spin_unlock(&ofd->ofd_grant_lock);
710
711         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
712                LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
713                exp, grant, want, left);
714
715         return grant;
716 }
717
718 /**
719  * Remove a client from the grant accounting totals.  We also remove
720  * the export from the obd device under the osfs and dev locks to ensure
721  * that the ofd_grant_sanity_check() calculations are always valid.
722  * The client should do something similar when it invalidates its import.
723  *
724  * \param exp - is the client's export to remove from grant accounting
725  */
726 void ofd_grant_discard(struct obd_export *exp)
727 {
728         struct obd_device               *obd = exp->exp_obd;
729         struct ofd_device               *ofd = ofd_exp(exp);
730         struct filter_export_data       *fed = &exp->exp_filter_data;
731
732         spin_lock(&ofd->ofd_grant_lock);
733         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
734                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
735                  obd->obd_name, ofd->ofd_tot_granted,
736                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
737         ofd->ofd_tot_granted -= fed->fed_grant;
738         fed->fed_grant = 0;
739         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
740                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
741                  obd->obd_name, ofd->ofd_tot_pending,
742                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
743         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
744          * finishes */
745         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
746                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
747                  obd->obd_name, ofd->ofd_tot_dirty,
748                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
749         ofd->ofd_tot_dirty -= fed->fed_dirty;
750         fed->fed_dirty = 0;
751         spin_unlock(&ofd->ofd_grant_lock);
752 }
753
754 /**
755  * Called at prepare time when handling read request. This function extracts
756  * incoming grant information from the obdo and processes the grant shrink
757  * request, if any.
758  *
759  * \param env - is the lu environment provided by the caller
760  * \param exp - is the export of the client which sent the request
761  * \paral oa - is the incoming obdo sent by the client
762  */
763 void ofd_grant_prepare_read(const struct lu_env *env,
764                             struct obd_export *exp, struct obdo *oa)
765 {
766         struct ofd_device       *ofd = ofd_exp(exp);
767         int                      do_shrink;
768         obd_size                 left = 0;
769
770         if (!oa)
771                 return;
772
773         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
774                 /* The read request does not contain any grant
775                  * information */
776                 return;
777
778         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
779             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
780                 /* To process grant shrink request, we need to know how much
781                  * available space remains on the backend filesystem.
782                  * Shrink requests are not so common, we always get fresh
783                  * statfs information. */
784                 ofd_grant_statfs(env, exp, 1, NULL);
785
786                 /* protect all grant counters */
787                 spin_lock(&ofd->ofd_grant_lock);
788
789                 /* Grab free space from cached statfs data and take out space
790                  * already granted to clients as well as reserved space */
791                 left = ofd_grant_space_left(exp);
792
793                 /* all set now to proceed with shrinking */
794                 do_shrink = 1;
795         } else {
796                 /* no grant shrinking request packed in the obdo and
797                  * since we don't grant space back on reads, no point
798                  * in running statfs, so just skip it and process
799                  * incoming grant data directly. */
800                 spin_lock(&ofd->ofd_grant_lock);
801                 do_shrink = 0;
802         }
803
804         /* extract incoming grant infomation provided by the client */
805         ofd_grant_incoming(env, exp, oa);
806
807         /* unlike writes, we don't return grants back on reads unless a grant
808          * shrink request was packed and we decided to turn it down. */
809         if (do_shrink)
810                 ofd_grant_shrink(exp, oa, left);
811         else
812                 oa->o_grant = 0;
813
814         spin_unlock(&ofd->ofd_grant_lock);
815 }
816
817 /**
818  * Called at write prepare time to handle incoming grant, check that we have
819  * enough space and grant some space back to the client if possible.
820  *
821  * \param env - is the lu environment provided by the caller
822  * \param exp - is the export of the client which sent the request
823  * \paral oa - is the incoming obdo sent by the client
824  * \param rnb - is the list of network buffers
825  * \param niocont - is the number of network buffers in the list
826  */
827 void ofd_grant_prepare_write(const struct lu_env *env,
828                              struct obd_export *exp, struct obdo *oa,
829                              struct niobuf_remote *rnb, int niocount)
830 {
831         struct obd_device       *obd = exp->exp_obd;
832         struct ofd_device       *ofd = ofd_exp(exp);
833         obd_size                 left;
834         int                      from_cache;
835         int                      force = 0; /* can use cached data intially */
836         int                      rc;
837
838         ENTRY;
839
840 refresh:
841         /* get statfs information from OSD layer */
842         ofd_grant_statfs(env, exp, force, &from_cache);
843
844         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
845
846         /* Grab free space from cached statfs data and take out space already
847          * granted to clients as well as reserved space */
848         left = ofd_grant_space_left(exp);
849
850         /* Get fresh statfs data if we are short in ungranted space */
851         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
852                 spin_unlock(&ofd->ofd_grant_lock);
853                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
854                        obd->obd_name);
855                 force = 1;
856                 goto refresh;
857         }
858
859         /* When close to free space exhaustion, trigger a sync to force
860          * writeback cache to consume required space immediately and release as
861          * much space as possible. */
862         if (!obd->obd_recovering && force != 2 &&
863             left < ofd_grant_chunk(NULL, ofd)) {
864                 bool from_grant = true;
865                 int  i;
866
867                 /* That said, it is worth running a sync only if some pages did
868                  * not consume grant space on the client and could thus fail
869                  * with ENOSPC later in ofd_grant_check() */
870                 for (i = 0; i < niocount; i++)
871                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
872                                 from_grant = false;
873
874                 if (!from_grant) {
875                         /* at least one network buffer requires acquiring grant
876                          * space on the server */
877                         spin_unlock(&ofd->ofd_grant_lock);
878                         /* discard errors, at least we tried ... */
879                         rc = dt_sync(env, ofd->ofd_osd);
880                         force = 2;
881                         goto refresh;
882                 }
883         }
884
885         /* extract incoming grant information provided by the client */
886         ofd_grant_incoming(env, exp, oa);
887
888         /* check limit */
889         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
890
891         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
892                 spin_unlock(&ofd->ofd_grant_lock);
893                 RETURN_EXIT;
894         }
895
896         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
897          * grant space. */
898         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
899             (oa->o_flags & OBD_FL_SHRINK_GRANT))
900                 ofd_grant_shrink(exp, oa, left);
901         else
902                 /* grant more space back to the client if possible */
903                 oa->o_grant = ofd_grant(exp, oa->o_grant, oa->o_undirty, left);
904         spin_unlock(&ofd->ofd_grant_lock);
905 }
906
907 /**
908  * Called during object precreation to consume grant space.
909  * More space is granted for precreation if possible.
910  *
911  * \param env - is the lu environment provided by the caller
912  * \param exp - is the export holding the grant space for precreation (= self
913  *            export currently)
914  * \paral nr - is the number of objects the caller wants to create objects
915  */
916 int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
917 {
918         struct ofd_thread_info          *info = ofd_info(env);
919         struct ofd_device               *ofd = ofd_exp(exp);
920         struct filter_export_data       *fed = &exp->exp_filter_data;
921         obd_size                         left = 0;
922         unsigned long                    wanted;
923
924         ENTRY;
925
926         info->fti_used = 0;
927
928         if (exp->exp_obd->obd_recovering ||
929             ofd->ofd_dt_conf.ddp_inodespace == 0)
930                 /* don't enforce grant during recovery */
931                 RETURN(0);
932
933         /* Update statfs data if required */
934         ofd_grant_statfs(env, exp, 1, NULL);
935
936         /* protect all grant counters */
937         spin_lock(&ofd->ofd_grant_lock);
938
939         /* fail precreate request if there is not enough blocks available for
940          * writing */
941         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
942             (ofd->ofd_osfs.os_blocks >> 10)) {
943                 spin_unlock(&ofd->ofd_grant_lock);
944                 CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
945                        ofd_obd(ofd)->obd_name,
946                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
947                 RETURN(-ENOSPC);
948         }
949
950         /* Grab free space from cached statfs data and take out space
951          * already granted to clients as well as reserved space */
952         left = ofd_grant_space_left(exp);
953
954         /* compute how much space is required to handle the precreation
955          * request */
956         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
957         if (wanted > fed->fed_grant + left) {
958                 /* that's beyond what remains, adjust the number of objects that
959                  * can be safely precreated */
960                 wanted = fed->fed_grant + left;
961                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
962                 if (*nr == 0) {
963                         /* we really have no space any more for precreation,
964                          * fail the precreate request with ENOSPC */
965                         spin_unlock(&ofd->ofd_grant_lock);
966                         RETURN(-ENOSPC);
967                 }
968                 /* compute space needed for the new number of creations */
969                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
970         }
971         LASSERT(wanted <= fed->fed_grant + left);
972
973         if (wanted <= fed->fed_grant) {
974                 /* we've enough grant space to handle this precreate request */
975                 fed->fed_grant -= wanted;
976         } else {
977                 /* we need to take some space from the ungranted pool */
978                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
979                 left -= wanted - fed->fed_grant;
980                 fed->fed_grant = 0;
981         }
982         info->fti_used = wanted;
983         fed->fed_pending += info->fti_used;
984         ofd->ofd_tot_pending += info->fti_used;
985
986         /* grant more space (twice as much as needed for this request) for
987          * precreate purpose if possible */
988         ofd_grant(exp, fed->fed_grant, wanted * 2, left);
989         spin_unlock(&ofd->ofd_grant_lock);
990         RETURN(0);
991 }
992
993 /**
994  * Called at commit time to update pending grant counter for writes in flight
995  *
996  * \param env - is the lu environment provided by the caller
997  * \param exp - is the export of the client which sent the request
998  */
999 void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp,
1000                       int rc)
1001 {
1002         struct ofd_device       *ofd  = ofd_exp(exp);
1003         struct ofd_thread_info  *info = ofd_info(env);
1004         unsigned long            pending;
1005
1006         ENTRY;
1007
1008         /* get space accounted in tot_pending for the I/O, set in
1009          * ofd_grant_check() */
1010         pending = info->fti_used;
1011         if (pending == 0)
1012                 RETURN_EXIT;
1013
1014         spin_lock(&ofd->ofd_grant_lock);
1015         /* Don't update statfs data for errors raised before commit (e.g.
1016          * bulk transfer failed, ...) since we know those writes have not been
1017          * processed. For other errors hit during commit, we cannot really tell
1018          * whether or not something was written, so we update statfs data.
1019          * In any case, this should not be fatal since we always get fresh
1020          * statfs data before failing a request with ENOSPC */
1021         if (rc == 0) {
1022                 spin_lock(&ofd->ofd_osfs_lock);
1023                 /* Take pending out of cached statfs data */
1024                 ofd->ofd_osfs.os_bavail -= min_t(obd_size,
1025                                                  ofd->ofd_osfs.os_bavail,
1026                                                  pending >> ofd->ofd_blockbits);
1027                 if (ofd->ofd_statfs_inflight)
1028                         /* someone is running statfs and want to be notified of
1029                          * writes happening meanwhile */
1030                         ofd->ofd_osfs_inflight += pending;
1031                 spin_unlock(&ofd->ofd_osfs_lock);
1032         }
1033
1034         if (exp->exp_filter_data.fed_pending < pending) {
1035                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1036                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1037                        exp->exp_filter_data.fed_pending, pending);
1038                 spin_unlock(&ofd->ofd_grant_lock);
1039                 LBUG();
1040         }
1041         exp->exp_filter_data.fed_pending -= pending;
1042
1043         if (ofd->ofd_tot_granted < pending) {
1044                  CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
1045                         "\n", exp->exp_obd->obd_name,
1046                         exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
1047                         pending);
1048                 spin_unlock(&ofd->ofd_grant_lock);
1049                 LBUG();
1050         }
1051         ofd->ofd_tot_granted -= pending;
1052
1053         if (ofd->ofd_tot_pending < pending) {
1054                  CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
1055                         "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1056                         exp, ofd->ofd_tot_pending, pending);
1057                 spin_unlock(&ofd->ofd_grant_lock);
1058                 LBUG();
1059         }
1060         ofd->ofd_tot_pending -= pending;
1061         spin_unlock(&ofd->ofd_grant_lock);
1062         EXIT;
1063 }