Whamcloud - gitweb
LU-2371 ptlrpc: get new xid for resend on EINPROGRESS
[fs/lustre-release.git] / lustre / ofd / ofd_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_grant.c
37  *
38  * Author: Johann Lombardi <johann@whamcloud..com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include "ofd_internal.h"
44
45 #define OFD_GRANT_CHUNK (2ULL * PTLRPC_MAX_BRW_SIZE)
46 #define OFD_GRANT_SHRINK_LIMIT (16ULL * OFD_GRANT_CHUNK)
47
48 static inline obd_size ofd_grant_from_cli(struct obd_export *exp,
49                                           struct ofd_device *ofd, obd_size val)
50 {
51         if (ofd_grant_compat(exp, ofd))
52                 /* clients not supporting OBD_CONNECT_GRANT_PARAM actually
53                  * consume 4KB of grant per block, we should thus inflate
54                  * the grant counters to reflect what was actually consumed */
55                 return val << (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
56         return val;
57 }
58
59 static inline obd_size ofd_grant_to_cli(struct obd_export *exp,
60                                         struct ofd_device *ofd, obd_size val)
61 {
62         if (ofd_grant_compat(exp, ofd))
63                 return val >> (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
64         return val;
65 }
66
67 static inline obd_size ofd_grant_chunk(struct obd_export *exp,
68                                        struct ofd_device *ofd)
69 {
70         if (exp && ofd_obd(ofd)->obd_self_export == exp)
71                 /* Grant enough space to handle a big precreate request */
72                 return OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace;
73
74         if (exp && ofd_grant_compat(exp, ofd))
75                 /* Try to grant enough space to send a full-size RPC */
76                 return PTLRPC_MAX_BRW_SIZE <<
77                        (ofd->ofd_blockbits - COMPAT_BSIZE_SHIFT);
78         return OFD_GRANT_CHUNK;
79 }
80
81 /**
82  * Perform extra sanity checks for grant accounting. This is done at connect,
83  * disconnect, and statfs RPC time, so it shouldn't be too bad. We can
84  * always get rid of it or turn it off when we know accounting is good.
85  *
86  * \param obd - is the device to check
87  * \param func - is the function to call if an inconsistency is found
88  */
89 void ofd_grant_sanity_check(struct obd_device *obd, const char *func)
90 {
91         struct filter_export_data       *fed;
92         struct ofd_device               *ofd = ofd_dev(obd->obd_lu_dev);
93         struct obd_export               *exp;
94         obd_size                         maxsize;
95         obd_size                         tot_dirty = 0;
96         obd_size                         tot_pending = 0;
97         obd_size                         tot_granted = 0;
98         obd_size                         fo_tot_dirty, fo_tot_pending;
99         obd_size                         fo_tot_granted;
100
101         if (cfs_list_empty(&obd->obd_exports))
102                 return;
103
104         /* We don't want to do this for large machines that do lots of
105          * mounts or unmounts.  It burns... */
106         if (obd->obd_num_exports > 100)
107                 return;
108
109         maxsize = ofd->ofd_osfs.os_blocks << ofd->ofd_blockbits;
110
111         spin_lock(&obd->obd_dev_lock);
112         spin_lock(&ofd->ofd_grant_lock);
113         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
114                 int error = 0;
115
116                 fed = &exp->exp_filter_data;
117
118                 if (obd->obd_self_export == exp)
119                         CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
120                                "%ld\n", obd->obd_name, fed->fed_grant,
121                                fed->fed_pending, fed->fed_dirty);
122
123                 if (fed->fed_grant < 0 || fed->fed_pending < 0 ||
124                     fed->fed_dirty < 0)
125                         error = 1;
126                 if (fed->fed_grant + fed->fed_pending > maxsize) {
127                         CERROR("%s: cli %s/%p fed_grant(%ld) + fed_pending(%ld)"
128                                " > maxsize("LPU64")\n", obd->obd_name,
129                                exp->exp_client_uuid.uuid, exp, fed->fed_grant,
130                                fed->fed_pending, maxsize);
131                         spin_unlock(&obd->obd_dev_lock);
132                         spin_unlock(&ofd->ofd_grant_lock);
133                         LBUG();
134                 }
135                 if (fed->fed_dirty > maxsize) {
136                         CERROR("%s: cli %s/%p fed_dirty(%ld) > maxsize("LPU64
137                                ")\n", obd->obd_name, exp->exp_client_uuid.uuid,
138                                exp, fed->fed_dirty, maxsize);
139                         spin_unlock(&obd->obd_dev_lock);
140                         spin_unlock(&ofd->ofd_grant_lock);
141                         LBUG();
142                 }
143                 CDEBUG_LIMIT(error ? D_ERROR : D_CACHE, "%s: cli %s/%p dirty "
144                              "%ld pend %ld grant %ld\n", obd->obd_name,
145                              exp->exp_client_uuid.uuid, exp, fed->fed_dirty,
146                              fed->fed_pending, fed->fed_grant);
147                 tot_granted += fed->fed_grant + fed->fed_pending;
148                 tot_pending += fed->fed_pending;
149                 tot_dirty += fed->fed_dirty;
150         }
151         spin_unlock(&obd->obd_dev_lock);
152         fo_tot_granted = ofd->ofd_tot_granted;
153         fo_tot_pending = ofd->ofd_tot_pending;
154         fo_tot_dirty = ofd->ofd_tot_dirty;
155
156         if (tot_granted != fo_tot_granted)
157                 CERROR("%s: tot_granted "LPU64" != fo_tot_granted "LPU64"\n",
158                        func, tot_granted, fo_tot_granted);
159         if (tot_pending != fo_tot_pending)
160                 CERROR("%s: tot_pending "LPU64" != fo_tot_pending "LPU64"\n",
161                        func, tot_pending, fo_tot_pending);
162         if (tot_dirty != fo_tot_dirty)
163                 CERROR("%s: tot_dirty "LPU64" != fo_tot_dirty "LPU64"\n",
164                        func, tot_dirty, fo_tot_dirty);
165         if (tot_pending > tot_granted)
166                 CERROR("%s: tot_pending "LPU64" > tot_granted "LPU64"\n",
167                        func, tot_pending, tot_granted);
168         if (tot_granted > maxsize)
169                 CERROR("%s: tot_granted "LPU64" > maxsize "LPU64"\n",
170                        func, tot_granted, maxsize);
171         if (tot_dirty > maxsize)
172                 CERROR("%s: tot_dirty "LPU64" > maxsize "LPU64"\n",
173                        func, tot_dirty, maxsize);
174         spin_unlock(&ofd->ofd_grant_lock);
175 }
176
177 /**
178  * Get fresh statfs information from the OSD layer if the cache is older than 1s
179  * or if force is set. The OSD layer is in charge of estimating data & metadata
180  * overhead.
181  *
182  * \param env - is the lu environment passed by the caller
183  * \param exp - export used to print client info in debug messages
184  * \param force - is used to force a refresh of statfs information
185  * \param from_cache - returns whether the statfs information are
186  *                   taken from cache
187  */
188 static void ofd_grant_statfs(const struct lu_env *env, struct obd_export *exp,
189                              int force, int *from_cache)
190 {
191         struct obd_device       *obd = exp->exp_obd;
192         struct ofd_device       *ofd = ofd_exp(exp);
193         struct obd_statfs       *osfs = &ofd_info(env)->fti_u.osfs;
194         __u64                    max_age;
195         int                      rc;
196
197         if (force)
198                 max_age = 0; /* get fresh statfs data */
199         else
200                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
201
202         rc = ofd_statfs_internal(env, ofd, osfs, max_age, from_cache);
203         if (unlikely(rc)) {
204                 *from_cache = 0;
205                 return;
206         }
207
208         CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64"\n",
209                obd->obd_name, exp->exp_client_uuid.uuid, exp,
210                osfs->os_bfree << ofd->ofd_blockbits,
211                osfs->os_bavail << ofd->ofd_blockbits);
212 }
213
214 /**
215  * Figure out how much space is available on the backend filesystem.
216  * This is done by accessing cached statfs data previously populated by
217  * ofd_grant_statfs(), from which we withdraw the space already granted to
218  * clients and the reserved space.
219  *
220  * \param exp - export which received the write request
221  */
222 static obd_size ofd_grant_space_left(struct obd_export *exp)
223 {
224         struct obd_device       *obd = exp->exp_obd;
225         struct ofd_device       *ofd = ofd_exp(exp);
226         obd_size                 tot_granted;
227         obd_size                 left, avail;
228         obd_size                 unstable;
229
230         ENTRY;
231         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
232
233         spin_lock(&ofd->ofd_osfs_lock);
234         /* get available space from cached statfs data */
235         left = ofd->ofd_osfs.os_bavail << ofd->ofd_blockbits;
236         unstable = ofd->ofd_osfs_unstable; /* those might be accounted twice */
237         spin_unlock(&ofd->ofd_osfs_lock);
238
239         tot_granted = ofd->ofd_tot_granted;
240
241         if (left < tot_granted) {
242                 int mask = (left + unstable <
243                             tot_granted - ofd->ofd_tot_pending) ?
244                             D_ERROR : D_CACHE;
245
246                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left "LPU64" < tot_grant "
247                              LPU64" unstable "LPU64" pending "LPU64"\n",
248                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
249                              left, tot_granted, unstable,
250                              ofd->ofd_tot_pending);
251                 RETURN(0);
252         }
253
254         avail = left;
255         /* Withdraw space already granted to clients */
256         left -= tot_granted;
257
258         /* If the left space is below the grant threshold x available space,
259          * stop granting space to clients.
260          * The purpose of this threshold is to keep some error margin on the
261          * overhead estimate made by the OSD layer. If we grant all the free
262          * space, we have no way (grant space cannot be revoked yet) to
263          * adjust if the write overhead has been underestimated. */
264         left -= min_t(obd_size, left, ofd_grant_reserved(ofd, avail));
265
266         /* Align left on block size */
267         left &= ~((1ULL << ofd->ofd_blockbits) - 1);
268
269         CDEBUG(D_CACHE, "%s: cli %s/%p avail "LPU64" left "LPU64" unstable "
270                LPU64" tot_grant "LPU64" pending "LPU64"\n", obd->obd_name,
271                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
272                tot_granted, ofd->ofd_tot_pending);
273
274         RETURN(left);
275 }
276
277 /**
278  * Grab the dirty and seen grant announcements from the incoming obdo.
279  * We will later calculate the client's new grant and return it.
280  * Caller must hold ofd_grant_lock spinlock.
281  *
282  * \param env - is the lu environment supplying osfs storage
283  * \param exp - is the export for which we received the request
284  * \paral oa - is the incoming obdo sent by the client
285  */
286 static void ofd_grant_incoming(const struct lu_env *env, struct obd_export *exp,
287                                struct obdo *oa)
288 {
289         struct filter_export_data       *fed;
290         struct ofd_device               *ofd = ofd_exp(exp);
291         struct obd_device               *obd = exp->exp_obd;
292         long                             dirty, dropped, grant_chunk;
293         ENTRY;
294
295         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
296
297         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
298                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
299                 oa->o_valid &= ~OBD_MD_FLGRANT;
300                 RETURN_EXIT;
301         }
302
303         fed = &exp->exp_filter_data;
304
305         /* Add some margin, since there is a small race if other RPCs arrive
306          * out-or-order and have already consumed some grant.  We want to
307          * leave this here in case there is a large error in accounting. */
308         CDEBUG(D_CACHE,
309                "%s: cli %s/%p reports grant "LPU64" dropped %u, local %lu\n",
310                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
311                oa->o_dropped, fed->fed_grant);
312
313         if ((long long)oa->o_dirty < 0)
314                 oa->o_dirty = 0;
315
316         dirty       = ofd_grant_from_cli(exp, ofd, oa->o_dirty);
317         dropped     = ofd_grant_from_cli(exp, ofd, (obd_size)oa->o_dropped);
318         grant_chunk = ofd_grant_chunk(exp, ofd);
319
320         /* Update our accounting now so that statfs takes it into account.
321          * Note that fed_dirty is only approximate and can become incorrect
322          * if RPCs arrive out-of-order.  No important calculations depend
323          * on fed_dirty however, but we must check sanity to not assert. */
324         if (dirty > fed->fed_grant + 4 * grant_chunk)
325                 dirty = fed->fed_grant + 4 * grant_chunk;
326         ofd->ofd_tot_dirty += dirty - fed->fed_dirty;
327         if (fed->fed_grant < dropped) {
328                 CDEBUG(D_CACHE,
329                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
330                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
331                        fed->fed_grant);
332                 dropped = 0;
333         }
334         if (ofd->ofd_tot_granted < dropped) {
335                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant "LPU64
336                        "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
337                        dropped, ofd->ofd_tot_granted);
338                 dropped = 0;
339         }
340         ofd->ofd_tot_granted -= dropped;
341         fed->fed_grant -= dropped;
342         fed->fed_dirty = dirty;
343
344         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
345                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
346                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
347                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
348                 spin_unlock(&ofd->ofd_grant_lock);
349                 LBUG();
350         }
351         EXIT;
352 }
353
354 /**
355  * Called when the client is able to release some grants. Proceed with the
356  * shrink request when there is less ungranted space remaining
357  * than the amount all of the connected clients would consume if they
358  * used their full grant.
359  *
360  * \param exp - is the export for which we received the request
361  * \paral oa - is the incoming obdo sent by the client
362  * \param left_space - is the remaining free space with space already granted
363  *                   taken out
364  */
365 static void ofd_grant_shrink(struct obd_export *exp,
366                              struct obdo *oa, obd_size left_space)
367 {
368         struct filter_export_data       *fed;
369         struct ofd_device               *ofd = ofd_exp(exp);
370         struct obd_device               *obd = exp->exp_obd;
371         long                             grant_shrink;
372
373         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
374
375         if (left_space >= ofd->ofd_tot_granted_clients *
376                           OFD_GRANT_SHRINK_LIMIT)
377                 return;
378
379         grant_shrink = ofd_grant_from_cli(exp, ofd, oa->o_grant);
380
381         fed = &exp->exp_filter_data;
382         fed->fed_grant       -= grant_shrink;
383         ofd->ofd_tot_granted -= grant_shrink;
384
385         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld fed_grant %ld total "
386                LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
387                exp, grant_shrink, fed->fed_grant, ofd->ofd_tot_granted);
388
389         /* client has just released some grant, don't grant any space back */
390         oa->o_grant = 0;
391 }
392
393 /**
394  * Calculate how much space is required to write a given network buffer
395  */
396 static inline int ofd_grant_rnb_size(struct obd_export *exp,
397                                      struct ofd_device *ofd,
398                                      struct niobuf_remote *rnb)
399 {
400         obd_size blocksize, bytes, end;
401
402         if (exp && ofd_grant_compat(exp, ofd))
403                 blocksize = 1ULL << COMPAT_BSIZE_SHIFT;
404         else
405                 blocksize = 1ULL << ofd->ofd_blockbits;
406
407         /* The network buffer might span several blocks, align it on block
408          * boundaries */
409         bytes  = rnb->rnb_offset & (blocksize - 1);
410         bytes += rnb->rnb_len;
411         end    = bytes & (blocksize - 1);
412         if (end)
413                 bytes += blocksize - end;
414         if (exp)
415                 /* Apply per-export pecularities if one is given */
416                 bytes = ofd_grant_from_cli(exp, ofd, (obd_size)bytes);
417         return bytes;
418 }
419
420
421 /**
422  * When clients have dirtied as much space as they've been granted they
423  * fall through to sync writes.  These sync writes haven't been expressed
424  * in grants and need to error with ENOSPC when there isn't room in the
425  * filesystem for them after grants are taken into account.  However,
426  * writeback of the dirty data that was already granted space can write
427  * right on through.
428  * Caller must hold ofd_grant_lock spinlock.
429  *
430  * \param env - is the lu environment passed by the caller
431  * \param exp - is the export identifying the client which sent the RPC
432  * \param oa  - is the incoming obdo in which we should return the pack the
433  *            additional grant
434  * \param rnb - is the list of network buffers
435  * \param niocont - is the number of network buffers in the list
436  * \param left - is the remaining free space with space already granted
437  *             taken out
438  */
439 static void ofd_grant_check(const struct lu_env *env, struct obd_export *exp,
440                             struct obdo *oa, struct niobuf_remote *rnb,
441                             int niocount, obd_size *left)
442 {
443         struct filter_export_data       *fed = &exp->exp_filter_data;
444         struct obd_device               *obd = exp->exp_obd;
445         struct ofd_device               *ofd = ofd_exp(exp);
446         unsigned long                    ungranted = 0;
447         unsigned long                    granted = 0;
448         int                              i;
449         int                              resend = 0;
450         struct ofd_thread_info          *info = ofd_info(env);
451
452         ENTRY;
453
454         LASSERT_SPIN_LOCKED(&ofd->ofd_grant_lock);
455
456         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
457             (oa->o_flags & OBD_FL_RECOV_RESEND)) {
458                 resend = 1;
459                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
460                                 "accounting\n");
461         }
462
463         for (i = 0; i < niocount; i++) {
464                 int bytes;
465
466                 if (obd->obd_recovering) {
467                         /* Replaying write. Grant info have been processed
468                          * already so no need to do any enforcement here.
469                          * It is worth noting that only bulk writes with all
470                          * rnbs having OBD_BRW_FROM_GRANT can be replayed.
471                          * If one page hasn't OBD_BRW_FROM_GRANT set, then
472                          * the whole bulk is written synchronously */
473                         if (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT) {
474                                  rnb[i].rnb_flags |= OBD_BRW_GRANTED;
475                                  continue;
476                         } else {
477                                 CERROR("%s: cli %s is replaying OST_WRITE "
478                                        "while one rnb hasn't OBD_BRW_FROM_GRANT"
479                                        " set (0x%x)\n", exp->exp_obd->obd_name,
480                                         exp->exp_client_uuid.uuid,
481                                         rnb[i].rnb_flags);
482
483                         }
484                 } else if ((oa->o_valid & OBD_MD_FLGRANT) &&
485                            (rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
486                         if (resend) {
487                                 /* This is a recoverable resend so grant
488                                  * information have already been processed */
489                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
490                                 continue;
491                         }
492
493                         /* inflate consumed space if needed */
494                         bytes = ofd_grant_rnb_size(exp, ofd, &rnb[i]);
495                         if (fed->fed_grant < granted + bytes) {
496                                 CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d "
497                                        "GRANT, real grant %lu idx %d\n",
498                                        exp->exp_obd->obd_name,
499                                        exp->exp_client_uuid.uuid, exp,
500                                        granted, bytes, fed->fed_grant, i);
501                         } else {
502                                 granted += bytes;
503                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
504                                 continue;
505                         }
506                 }
507
508                 /* Consume grant space on the server.
509                  * Unlike above, ofd_grant_rnb_size() is called with exp = NULL
510                  * so that the required grant space isn't inflated. This is
511                  * done on purpose since the server can deal with large block
512                  * size, unlike some clients */
513                 bytes = ofd_grant_rnb_size(NULL, ofd, &rnb[i]);
514                 if (*left > ungranted + bytes) {
515                         /* if enough space, pretend it was granted */
516                         ungranted += bytes;
517                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
518                         continue;
519                 }
520
521                 /* We can't check for already-mapped blocks here (make sense
522                  * when backend filesystem does not use COW) as it requires
523                  * dropping the grant lock.
524                  * Instead, we clear ~OBD_BRW_GRANTED and in that case we need
525                  * to go through and verify if all of the blocks not marked
526                  *  BRW_GRANTED are already mapped and we can ignore this error.
527                  */
528                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
529                 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
530                                 exp->exp_obd->obd_name,
531                                 exp->exp_client_uuid.uuid, exp, i, bytes);
532         }
533
534         /* record space used for the I/O, will be used in ofd_grant_commmit() */
535         /* Now substract what the clients has used already.  We don't subtract
536          * this from the tot_granted yet, so that other client's can't grab
537          * that space before we have actually allocated our blocks. That
538          * happens in ofd_grant_commit() after the writes are done. */
539         info->fti_used = granted + ungranted;
540         *left -= ungranted;
541         fed->fed_grant -= granted;
542         fed->fed_pending += info->fti_used;
543         ofd->ofd_tot_granted += ungranted;
544         ofd->ofd_tot_pending += info->fti_used;
545
546         CDEBUG(D_CACHE,
547                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
548                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
549                granted, ungranted, fed->fed_grant, fed->fed_dirty);
550
551         if (obd->obd_recovering)
552                 /* don't update dirty accounting during recovery */
553                 RETURN_EXIT;
554
555         if (fed->fed_dirty < granted) {
556                 CWARN("%s: cli %s/%p claims granted %lu > fed_dirty %lu\n",
557                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
558                        granted, fed->fed_dirty);
559                 granted = fed->fed_dirty;
560         }
561         ofd->ofd_tot_dirty -= granted;
562         fed->fed_dirty -= granted;
563
564         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
565                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
566                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
567                        fed->fed_dirty, fed->fed_pending, fed->fed_grant);
568                 spin_unlock(&ofd->ofd_grant_lock);
569                 LBUG();
570         }
571         EXIT;
572 }
573
574 /**
575  * Calculate how much grant space to return to client, based on how much space
576  * is currently free and how much of that is already granted.
577  * Caller must hold ofd_grant_lock spinlock.
578  *
579  * \param exp - is the export of the client which sent the request
580  * \param curgrant - is the current grant claimed by the client
581  * \param want - is how much grant space the client would like to have
582  * \param left - is the remaining free space with granted space taken out
583  */
584 static long ofd_grant(struct obd_export *exp, obd_size curgrant,
585                       obd_size want, obd_size left)
586 {
587         struct obd_device               *obd = exp->exp_obd;
588         struct ofd_device               *ofd = ofd_exp(exp);
589         struct filter_export_data       *fed = &exp->exp_filter_data;
590         long                             grant_chunk;
591         obd_size                         grant;
592
593         ENTRY;
594
595         if (ofd_grant_prohibit(exp, ofd) || left == 0 || exp->exp_failed)
596                 RETURN(0);
597
598         if (want > 0x7fffffff) {
599                 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
600                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
601                 RETURN(0);
602         }
603
604         /* client not supporting OBD_CONNECT_GRANT_PARAM works with a 4KB block
605          * size while the reality is different */
606         curgrant    = ofd_grant_from_cli(exp, ofd, curgrant);
607         want    = ofd_grant_from_cli(exp, ofd, want);
608         grant_chunk = ofd_grant_chunk(exp, ofd);
609
610         /* Grant some fraction of the client's requested grant space so that
611          * they are not always waiting for write credits (not all of it to
612          * avoid overgranting in face of multiple RPCs in flight).  This
613          * essentially will be able to control the OSC_MAX_RIF for a client.
614          *
615          * If we do have a large disparity between what the client thinks it
616          * has and what we think it has, don't grant very much and let the
617          * client consume its grant first.  Either it just has lots of RPCs
618          * in flight, or it was evicted and its grants will soon be used up. */
619         if (curgrant >= want || curgrant >= fed->fed_grant + grant_chunk)
620                    RETURN(0);
621
622         if (!obd->obd_recovering)
623                 /* don't grant more than 1/8th of the remaining free space in
624                  * one chunk */
625                 left >>= 3;
626         grant = min(want, left);
627         /* align grant on block size */
628         grant &= ~((1ULL << ofd->ofd_blockbits) - 1);
629
630         if (!grant)
631                 RETURN(0);
632
633         /* Allow >OFD_GRANT_CHUNK size when clients reconnect due to a
634          * server reboot. */
635         if ((grant > grant_chunk) && (!obd->obd_recovering))
636                 grant = grant_chunk;
637
638         ofd->ofd_tot_granted += grant;
639         fed->fed_grant += grant;
640
641         if (fed->fed_grant < 0) {
642                 CERROR("%s: cli %s/%p grant %ld want "LPU64" current "LPU64"\n",
643                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
644                        fed->fed_grant, want, curgrant);
645                 spin_unlock(&ofd->ofd_grant_lock);
646                 LBUG();
647         }
648
649         CDEBUG(D_CACHE,
650                "%s: cli %s/%p wants: "LPU64" current grant "LPU64
651                " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
652                exp, want, curgrant, grant);
653         CDEBUG(D_CACHE,
654                "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
655                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
656                exp, ofd->ofd_tot_dirty, ofd->ofd_tot_granted,
657                obd->obd_num_exports);
658
659         RETURN(ofd_grant_to_cli(exp, ofd, grant));
660 }
661
662 /**
663  * Client connection or reconnection.
664  *
665  * \param env - is the lu environment provided by the caller
666  * \param exp - is the client's export which is reconnecting
667  * \param want - is how much the client would like to get
668  */
669 long ofd_grant_connect(const struct lu_env *env, struct obd_export *exp,
670                        obd_size want)
671 {
672         struct ofd_device               *ofd = ofd_exp(exp);
673         struct filter_export_data       *fed = &exp->exp_filter_data;
674         obd_size                         left = 0;
675         long                             grant;
676         int                              from_cache;
677         int                              force = 0; /* can use cached data */
678
679         /* don't grant space to client with read-only access */
680         if ((exp->exp_connect_flags & OBD_CONNECT_RDONLY) ||
681             ofd_grant_prohibit(exp, ofd))
682                 return 0;
683
684 refresh:
685         ofd_grant_statfs(env, exp, force, &from_cache);
686
687         spin_lock(&ofd->ofd_grant_lock);
688
689         /* Grab free space from cached info and take out space already granted
690          * to clients as well as reserved space */
691         left = ofd_grant_space_left(exp);
692
693         /* get fresh statfs data if we are short in ungranted space */
694         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
695                 spin_unlock(&ofd->ofd_grant_lock);
696                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
697                 force = 1;
698                 goto refresh;
699         }
700
701         ofd_grant(exp, ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant),
702                   want, left);
703
704         /* return to client its current grant */
705         grant = ofd_grant_to_cli(exp, ofd, (obd_size)fed->fed_grant);
706         ofd->ofd_tot_granted_clients++;
707
708         spin_unlock(&ofd->ofd_grant_lock);
709
710         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: "LPU64" left: "
711                LPU64"\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
712                exp, grant, want, left);
713
714         return grant;
715 }
716
717 /**
718  * Remove a client from the grant accounting totals.  We also remove
719  * the export from the obd device under the osfs and dev locks to ensure
720  * that the ofd_grant_sanity_check() calculations are always valid.
721  * The client should do something similar when it invalidates its import.
722  *
723  * \param exp - is the client's export to remove from grant accounting
724  */
725 void ofd_grant_discard(struct obd_export *exp)
726 {
727         struct obd_device               *obd = exp->exp_obd;
728         struct ofd_device               *ofd = ofd_exp(exp);
729         struct filter_export_data       *fed = &exp->exp_filter_data;
730
731         spin_lock(&ofd->ofd_grant_lock);
732         LASSERTF(ofd->ofd_tot_granted >= fed->fed_grant,
733                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
734                  obd->obd_name, ofd->ofd_tot_granted,
735                  exp->exp_client_uuid.uuid, exp, fed->fed_grant);
736         ofd->ofd_tot_granted -= fed->fed_grant;
737         fed->fed_grant = 0;
738         LASSERTF(ofd->ofd_tot_pending >= fed->fed_pending,
739                  "%s: tot_pending "LPU64" cli %s/%p fed_pending %ld\n",
740                  obd->obd_name, ofd->ofd_tot_pending,
741                  exp->exp_client_uuid.uuid, exp, fed->fed_pending);
742         /* ofd_tot_pending is handled in ofd_grant_commit as bulk
743          * finishes */
744         LASSERTF(ofd->ofd_tot_dirty >= fed->fed_dirty,
745                  "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %ld\n",
746                  obd->obd_name, ofd->ofd_tot_dirty,
747                  exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
748         ofd->ofd_tot_dirty -= fed->fed_dirty;
749         fed->fed_dirty = 0;
750         spin_unlock(&ofd->ofd_grant_lock);
751 }
752
753 /**
754  * Called at prepare time when handling read request. This function extracts
755  * incoming grant information from the obdo and processes the grant shrink
756  * request, if any.
757  *
758  * \param env - is the lu environment provided by the caller
759  * \param exp - is the export of the client which sent the request
760  * \paral oa - is the incoming obdo sent by the client
761  */
762 void ofd_grant_prepare_read(const struct lu_env *env,
763                             struct obd_export *exp, struct obdo *oa)
764 {
765         struct ofd_device       *ofd = ofd_exp(exp);
766         int                      do_shrink;
767         obd_size                 left = 0;
768
769         if (!oa)
770                 return;
771
772         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
773                 /* The read request does not contain any grant
774                  * information */
775                 return;
776
777         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
778             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
779                 /* To process grant shrink request, we need to know how much
780                  * available space remains on the backend filesystem.
781                  * Shrink requests are not so common, we always get fresh
782                  * statfs information. */
783                 ofd_grant_statfs(env, exp, 1, NULL);
784
785                 /* protect all grant counters */
786                 spin_lock(&ofd->ofd_grant_lock);
787
788                 /* Grab free space from cached statfs data and take out space
789                  * already granted to clients as well as reserved space */
790                 left = ofd_grant_space_left(exp);
791
792                 /* all set now to proceed with shrinking */
793                 do_shrink = 1;
794         } else {
795                 /* no grant shrinking request packed in the obdo and
796                  * since we don't grant space back on reads, no point
797                  * in running statfs, so just skip it and process
798                  * incoming grant data directly. */
799                 spin_lock(&ofd->ofd_grant_lock);
800                 do_shrink = 0;
801         }
802
803         /* extract incoming grant infomation provided by the client */
804         ofd_grant_incoming(env, exp, oa);
805
806         /* unlike writes, we don't return grants back on reads unless a grant
807          * shrink request was packed and we decided to turn it down. */
808         if (do_shrink)
809                 ofd_grant_shrink(exp, oa, left);
810         else
811                 oa->o_grant = 0;
812
813         spin_unlock(&ofd->ofd_grant_lock);
814 }
815
816 /**
817  * Called at write prepare time to handle incoming grant, check that we have
818  * enough space and grant some space back to the client if possible.
819  *
820  * \param env - is the lu environment provided by the caller
821  * \param exp - is the export of the client which sent the request
822  * \paral oa - is the incoming obdo sent by the client
823  * \param rnb - is the list of network buffers
824  * \param niocont - is the number of network buffers in the list
825  */
826 void ofd_grant_prepare_write(const struct lu_env *env,
827                              struct obd_export *exp, struct obdo *oa,
828                              struct niobuf_remote *rnb, int niocount)
829 {
830         struct obd_device       *obd = exp->exp_obd;
831         struct ofd_device       *ofd = ofd_exp(exp);
832         obd_size                 left;
833         int                      from_cache;
834         int                      force = 0; /* can use cached data intially */
835         int                      rc;
836
837         ENTRY;
838
839 refresh:
840         /* get statfs information from OSD layer */
841         ofd_grant_statfs(env, exp, force, &from_cache);
842
843         spin_lock(&ofd->ofd_grant_lock); /* protect all grant counters */
844
845         /* Grab free space from cached statfs data and take out space already
846          * granted to clients as well as reserved space */
847         left = ofd_grant_space_left(exp);
848
849         /* Get fresh statfs data if we are short in ungranted space */
850         if (from_cache && left < 32 * ofd_grant_chunk(exp, ofd)) {
851                 spin_unlock(&ofd->ofd_grant_lock);
852                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
853                        obd->obd_name);
854                 force = 1;
855                 goto refresh;
856         }
857
858         /* When close to free space exhaustion, trigger a sync to force
859          * writeback cache to consume required space immediately and release as
860          * much space as possible. */
861         if (!obd->obd_recovering && force != 2 &&
862             left < ofd_grant_chunk(NULL, ofd)) {
863                 bool from_grant = true;
864                 int  i;
865
866                 /* That said, it is worth running a sync only if some pages did
867                  * not consume grant space on the client and could thus fail
868                  * with ENOSPC later in ofd_grant_check() */
869                 for (i = 0; i < niocount; i++)
870                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
871                                 from_grant = false;
872
873                 if (!from_grant) {
874                         /* at least one network buffer requires acquiring grant
875                          * space on the server */
876                         spin_unlock(&ofd->ofd_grant_lock);
877                         /* discard errors, at least we tried ... */
878                         rc = dt_sync(env, ofd->ofd_osd);
879                         force = 2;
880                         goto refresh;
881                 }
882         }
883
884         /* extract incoming grant information provided by the client */
885         ofd_grant_incoming(env, exp, oa);
886
887         /* check limit */
888         ofd_grant_check(env, exp, oa, rnb, niocount, &left);
889
890         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
891                 spin_unlock(&ofd->ofd_grant_lock);
892                 RETURN_EXIT;
893         }
894
895         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
896          * grant space. */
897         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
898             (oa->o_flags & OBD_FL_SHRINK_GRANT))
899                 ofd_grant_shrink(exp, oa, left);
900         else
901                 /* grant more space back to the client if possible */
902                 oa->o_grant = ofd_grant(exp, oa->o_grant, oa->o_undirty, left);
903         spin_unlock(&ofd->ofd_grant_lock);
904 }
905
906 /**
907  * Called during object precreation to consume grant space.
908  * More space is granted for precreation if possible.
909  *
910  * \param env - is the lu environment provided by the caller
911  * \param exp - is the export holding the grant space for precreation (= self
912  *            export currently)
913  * \paral nr - is the number of objects the caller wants to create objects
914  */
915 int ofd_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
916 {
917         struct ofd_thread_info          *info = ofd_info(env);
918         struct ofd_device               *ofd = ofd_exp(exp);
919         struct filter_export_data       *fed = &exp->exp_filter_data;
920         obd_size                         left = 0;
921         unsigned long                    wanted;
922
923         ENTRY;
924
925         info->fti_used = 0;
926
927         if (exp->exp_obd->obd_recovering ||
928             ofd->ofd_dt_conf.ddp_inodespace == 0)
929                 /* don't enforce grant during recovery */
930                 RETURN(0);
931
932         /* Update statfs data if required */
933         ofd_grant_statfs(env, exp, 1, NULL);
934
935         /* protect all grant counters */
936         spin_lock(&ofd->ofd_grant_lock);
937
938         /* fail precreate request if there is not enough blocks available for
939          * writing */
940         if (ofd->ofd_osfs.os_bavail - (fed->fed_grant >> ofd->ofd_blockbits) <
941             (ofd->ofd_osfs.os_blocks >> 10)) {
942                 spin_unlock(&ofd->ofd_grant_lock);
943                 CDEBUG(D_RPCTRACE, "%s: not enough space for create "LPU64"\n",
944                        ofd_obd(ofd)->obd_name,
945                        ofd->ofd_osfs.os_bavail * ofd->ofd_osfs.os_blocks);
946                 RETURN(-ENOSPC);
947         }
948
949         /* Grab free space from cached statfs data and take out space
950          * already granted to clients as well as reserved space */
951         left = ofd_grant_space_left(exp);
952
953         /* compute how much space is required to handle the precreation
954          * request */
955         wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
956         if (wanted > fed->fed_grant + left) {
957                 /* that's beyond what remains, adjust the number of objects that
958                  * can be safely precreated */
959                 wanted = fed->fed_grant + left;
960                 *nr = wanted / ofd->ofd_dt_conf.ddp_inodespace;
961                 if (*nr == 0) {
962                         /* we really have no space any more for precreation,
963                          * fail the precreate request with ENOSPC */
964                         spin_unlock(&ofd->ofd_grant_lock);
965                         RETURN(-ENOSPC);
966                 }
967                 /* compute space needed for the new number of creations */
968                 wanted = *nr * ofd->ofd_dt_conf.ddp_inodespace;
969         }
970         LASSERT(wanted <= fed->fed_grant + left);
971
972         if (wanted <= fed->fed_grant) {
973                 /* we've enough grant space to handle this precreate request */
974                 fed->fed_grant -= wanted;
975         } else {
976                 /* we need to take some space from the ungranted pool */
977                 ofd->ofd_tot_granted += wanted - fed->fed_grant;
978                 left -= wanted - fed->fed_grant;
979                 fed->fed_grant = 0;
980         }
981         info->fti_used = wanted;
982         fed->fed_pending += info->fti_used;
983         ofd->ofd_tot_pending += info->fti_used;
984
985         /* grant more space (twice as much as needed for this request) for
986          * precreate purpose if possible */
987         ofd_grant(exp, fed->fed_grant, wanted * 2, left);
988         spin_unlock(&ofd->ofd_grant_lock);
989         RETURN(0);
990 }
991
992 /**
993  * Called at commit time to update pending grant counter for writes in flight
994  *
995  * \param env - is the lu environment provided by the caller
996  * \param exp - is the export of the client which sent the request
997  */
998 void ofd_grant_commit(const struct lu_env *env, struct obd_export *exp,
999                       int rc)
1000 {
1001         struct ofd_device       *ofd  = ofd_exp(exp);
1002         struct ofd_thread_info  *info = ofd_info(env);
1003         unsigned long            pending;
1004
1005         ENTRY;
1006
1007         /* get space accounted in tot_pending for the I/O, set in
1008          * ofd_grant_check() */
1009         pending = info->fti_used;
1010         if (pending == 0)
1011                 RETURN_EXIT;
1012
1013         spin_lock(&ofd->ofd_grant_lock);
1014         /* Don't update statfs data for errors raised before commit (e.g.
1015          * bulk transfer failed, ...) since we know those writes have not been
1016          * processed. For other errors hit during commit, we cannot really tell
1017          * whether or not something was written, so we update statfs data.
1018          * In any case, this should not be fatal since we always get fresh
1019          * statfs data before failing a request with ENOSPC */
1020         if (rc == 0) {
1021                 spin_lock(&ofd->ofd_osfs_lock);
1022                 /* Take pending out of cached statfs data */
1023                 ofd->ofd_osfs.os_bavail -= min_t(obd_size,
1024                                                  ofd->ofd_osfs.os_bavail,
1025                                                  pending >> ofd->ofd_blockbits);
1026                 if (ofd->ofd_statfs_inflight)
1027                         /* someone is running statfs and want to be notified of
1028                          * writes happening meanwhile */
1029                         ofd->ofd_osfs_inflight += pending;
1030                 spin_unlock(&ofd->ofd_osfs_lock);
1031         }
1032
1033         if (exp->exp_filter_data.fed_pending < pending) {
1034                 CERROR("%s: cli %s/%p fed_pending(%lu) < grant_used(%lu)\n",
1035                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1036                        exp->exp_filter_data.fed_pending, pending);
1037                 spin_unlock(&ofd->ofd_grant_lock);
1038                 LBUG();
1039         }
1040         exp->exp_filter_data.fed_pending -= pending;
1041
1042         if (ofd->ofd_tot_granted < pending) {
1043                  CERROR("%s: cli %s/%p tot_granted("LPU64") < grant_used(%lu)"
1044                         "\n", exp->exp_obd->obd_name,
1045                         exp->exp_client_uuid.uuid, exp, ofd->ofd_tot_granted,
1046                         pending);
1047                 spin_unlock(&ofd->ofd_grant_lock);
1048                 LBUG();
1049         }
1050         ofd->ofd_tot_granted -= pending;
1051
1052         if (ofd->ofd_tot_pending < pending) {
1053                  CERROR("%s: cli %s/%p tot_pending("LPU64") < grant_used(%lu)"
1054                         "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1055                         exp, ofd->ofd_tot_pending, pending);
1056                 spin_unlock(&ofd->ofd_grant_lock);
1057                 LBUG();
1058         }
1059         ofd->ofd_tot_pending -= pending;
1060         spin_unlock(&ofd->ofd_grant_lock);
1061         EXIT;
1062 }