Whamcloud - gitweb
LU-9183 libcfs: handle dump_trace() and related callbacks removal
[fs/lustre-release.git] / lustre / target / tgt_grant.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * lustre/target/tgt_grant.c
30  *
31  * This file provides code related to grant space management on Lustre Targets
32  * (OSTs and MDTs). Grant is a mechanism used by client nodes to reserve disk
33  * space on a target for the data writeback cache. The Lustre client is thus
34  * assured that enough space will be available when flushing dirty pages
35  * asynchronously. Each client node is granted an initial amount of reserved
36  * space at connect time and gets additional space back from target in bulk
37  * write reply.
38  *
39  * We actually support three different cases:
40  * - The client supports the new grant parameters (i.e. OBD_CONNECT_GRANT_PARAM)
41  *   which means that all grant overhead calculation happens on the client side.
42  *   The server reports at connect time the backend filesystem block size, the
43  *   maximum extent size as well as the extent insertion cost and it is then up
44  *   to the osc layer to the track dirty extents and consume grant accordingly
45  *   (see osc_cache.c). In each bulk write request, the client provides how much
46  *   grant space was consumed for this RPC.
47  * - The client does not support OBD_CONNECT_GRANT_PARAM and always assumes a
48  *   a backend file system block size of 4KB. We then have two cases:
49  *   - If the block size is really 4KB, then the client can deal with grant
50  *     allocation for partial block writes, but won't take extent insertion cost
51  *     into account. For such clients, we inflate grant by 100% on the server
52  *     side. It means that when 32MB of grant is hold by the client, 64MB of
53  *     grant space is actually reserved on the server. All grant counters
54  *     provided by such a client are inflated by 100%.
55  *   - The backend filesystem block size is bigger than 4KB, which isn't
56  *     supported by the client. In this case, we emulate a 4KB block size and
57  *     consume one block size on the server for each 4KB of grant returned to
58  *     client. With a 128KB blocksize, it means that 32MB dirty pages of 4KB
59  *     on the client will actually consume 1GB of grant on the server.
60  *     All grant counters provided by such a client are inflated by the block
61  *     size ratio.
62  *
63  * This file handles the core logic for:
64  * - grant allocation strategy
65  * - maintaining per-client as well as global grant space accounting
66  * - processing grant information packed in incoming requests
67  * - allocating server-side grant space for synchronous write RPCs which did not
68  *   consume grant on the client side (OBD_BRW_FROM_GRANT flag not set). If not
69  *   enough space is available, such RPCs fail with ENOSPC
70  *
71  * Author: Johann Lombardi <johann.lombardi@intel.com>
72  */
73
74 #define DEBUG_SUBSYSTEM S_FILTER
75
76 #include <obd.h>
77 #include <obd_class.h>
78
79 #include "tgt_internal.h"
80
81 /* Clients typically hold 2x their max_rpcs_in_flight of grant space */
82 #define TGT_GRANT_SHRINK_LIMIT(exp)     (2ULL * 8 * exp_max_brw_size(exp))
83
84 /* Helpers to inflate/deflate grants for clients that do not support the grant
85  * parameters */
86 static inline u64 tgt_grant_inflate(struct tg_grants_data *tgd, u64 val)
87 {
88         if (tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT)
89                 /* Client does not support such large block size, grant
90                  * is thus inflated. We already significantly overestimate
91                  * overhead, no need to add the extent tax in this case */
92                 return val << (tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT);
93         /* client can deal with the block size, but does not support per-extent
94          * grant accounting, inflate grant by 100% for such clients */
95         return val << 1;
96 }
97
98 /* Companion of tgt_grant_inflate() */
99 static inline u64 tgt_grant_deflate(struct tg_grants_data *tgd, u64 val)
100 {
101         if (tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT)
102                 return val >> (tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT);
103         return val >> 1;
104 }
105
106 /* Grant chunk is used as a unit for grant allocation. It should be inflated
107  * if the client does not support the grant paramaters.
108  * Check connection flag against \a data if not NULL. This is used during
109  * connection creation where exp->exp_connect_data isn't populated yet */
110 static inline u64 tgt_grant_chunk(struct obd_export *exp,
111                                   struct lu_target *lut,
112                                   struct obd_connect_data *data)
113 {
114         struct tg_grants_data *tgd = &lut->lut_tgd;
115         u64 chunk = exp_max_brw_size(exp);
116         u64 tax;
117
118         if (exp->exp_obd->obd_self_export == exp)
119                 /* Grant enough space to handle a big precreate request */
120                 return OST_MAX_PRECREATE * lut->lut_dt_conf.ddp_inodespace / 2;
121
122         if ((data == NULL && !(exp_grant_param_supp(exp))) ||
123             (data != NULL && !OCD_HAS_FLAG(data, GRANT_PARAM)))
124                 /* Try to grant enough space to send a full-size RPC */
125                 return tgt_grant_inflate(tgd, chunk);
126
127         /* Try to return enough to send two full-size RPCs
128          * = 2 * (BRW_size + #extents_in_BRW * grant_tax) */
129         tax = 1ULL << tgd->tgd_blockbits;            /* block size */
130         tax *= lut->lut_dt_conf.ddp_max_extent_blks; /* max extent size */
131         tax = (chunk + tax - 1) / tax;               /* #extents in a RPC */
132         tax *= lut->lut_dt_conf.ddp_extent_tax;      /* extent tax for a RPC */
133         chunk = (chunk + tax) * 2;                   /* we said two full RPCs */
134         return chunk;
135 }
136
137 static int tgt_check_export_grants(struct obd_export *exp, u64 *dirty,
138                                    u64 *pending, u64 *granted, u64 maxsize)
139 {
140         struct tg_export_data *ted = &exp->exp_target_data;
141         int level = D_CACHE;
142
143         if (exp->exp_obd->obd_self_export == exp)
144                 CDEBUG(D_CACHE, "%s: processing self export: %ld %ld "
145                        "%ld\n", exp->exp_obd->obd_name, ted->ted_grant,
146                        ted->ted_pending, ted->ted_dirty);
147
148         if (ted->ted_grant < 0 || ted->ted_pending < 0 || ted->ted_dirty < 0)
149                 level = D_ERROR;
150         CDEBUG_LIMIT(level, "%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
151                      exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
152                      ted->ted_dirty, ted->ted_pending, ted->ted_grant);
153
154         if (ted->ted_grant + ted->ted_pending > maxsize) {
155                 CERROR("%s: cli %s/%p ted_grant(%ld) + ted_pending(%ld)"
156                         " > maxsize(%llu)\n", exp->exp_obd->obd_name,
157                         exp->exp_client_uuid.uuid, exp, ted->ted_grant,
158                         ted->ted_pending, maxsize);
159                 return -EFAULT;
160         }
161         if (ted->ted_dirty > maxsize) {
162                 CERROR("%s: cli %s/%p ted_dirty(%ld) > maxsize(%llu)\n",
163                         exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
164                         exp, ted->ted_dirty, maxsize);
165                 return -EFAULT;
166         }
167         *granted += ted->ted_grant + ted->ted_pending;
168         *pending += ted->ted_pending;
169         *dirty += ted->ted_dirty;
170         return 0;
171 }
172
173 /**
174  * Perform extra sanity checks for grant accounting.
175  *
176  * This function scans the export list, sanity checks per-export grant counters
177  * and verifies accuracy of global grant accounting. If an inconsistency is
178  * found, a CERROR is printed with the function name \func that was passed as
179  * argument. LBUG is only called in case of serious counter corruption (i.e.
180  * value larger than the device size).
181  * Those sanity checks can be pretty expensive and are disabled if the OBD
182  * device has more than 100 connected exports.
183  *
184  * \param[in] obd       OBD device for which grant accounting should be
185  *                      verified
186  * \param[in] func      caller's function name
187  */
188 void tgt_grant_sanity_check(struct obd_device *obd, const char *func)
189 {
190         struct lu_target *lut = obd->u.obt.obt_lut;
191         struct tg_grants_data *tgd = &lut->lut_tgd;
192         struct obd_export *exp;
193         u64                maxsize;
194         u64                tot_dirty = 0;
195         u64                tot_pending = 0;
196         u64                tot_granted = 0;
197         u64                fo_tot_granted;
198         u64                fo_tot_pending;
199         u64                fo_tot_dirty;
200         int                error;
201
202         if (list_empty(&obd->obd_exports))
203                 return;
204
205         /* We don't want to do this for large machines that do lots of
206          * mounts or unmounts.  It burns... */
207         if (obd->obd_num_exports > 100)
208                 return;
209
210         maxsize = tgd->tgd_osfs.os_blocks << tgd->tgd_blockbits;
211
212         spin_lock(&obd->obd_dev_lock);
213         spin_lock(&tgd->tgd_grant_lock);
214         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
215                 error = tgt_check_export_grants(exp, &tot_dirty, &tot_pending,
216                                                 &tot_granted, maxsize);
217                 if (error < 0) {
218                         spin_unlock(&obd->obd_dev_lock);
219                         spin_unlock(&tgd->tgd_grant_lock);
220                         LBUG();
221                 }
222         }
223
224         /* exports about to be unlinked should also be taken into account since
225          * they might still hold pending grant space to be released at
226          * commit time */
227         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain) {
228                 error = tgt_check_export_grants(exp, &tot_dirty, &tot_pending,
229                                                 &tot_granted, maxsize);
230                 if (error < 0) {
231                         spin_unlock(&obd->obd_dev_lock);
232                         spin_unlock(&tgd->tgd_grant_lock);
233                         LBUG();
234                 }
235         }
236
237         fo_tot_granted = tgd->tgd_tot_granted;
238         fo_tot_pending = tgd->tgd_tot_pending;
239         fo_tot_dirty = tgd->tgd_tot_dirty;
240         spin_unlock(&obd->obd_dev_lock);
241         spin_unlock(&tgd->tgd_grant_lock);
242
243         if (tot_granted != fo_tot_granted)
244                 CERROR("%s: tot_granted %llu != fo_tot_granted %llu\n",
245                        func, tot_granted, fo_tot_granted);
246         if (tot_pending != fo_tot_pending)
247                 CERROR("%s: tot_pending %llu != fo_tot_pending %llu\n",
248                        func, tot_pending, fo_tot_pending);
249         if (tot_dirty != fo_tot_dirty)
250                 CERROR("%s: tot_dirty %llu != fo_tot_dirty %llu\n",
251                        func, tot_dirty, fo_tot_dirty);
252         if (tot_pending > tot_granted)
253                 CERROR("%s: tot_pending %llu > tot_granted %llu\n",
254                        func, tot_pending, tot_granted);
255         if (tot_granted > maxsize)
256                 CERROR("%s: tot_granted %llu > maxsize %llu\n",
257                        func, tot_granted, maxsize);
258         if (tot_dirty > maxsize)
259                 CERROR("%s: tot_dirty %llu > maxsize %llu\n",
260                        func, tot_dirty, maxsize);
261 }
262 EXPORT_SYMBOL(tgt_grant_sanity_check);
263
264 /**
265  * Get file system statistics of target.
266  *
267  * Helper function for statfs(), also used by grant code.
268  * Implements caching for statistics to avoid calling OSD device each time.
269  *
270  * \param[in]  env        execution environment
271  * \param[in]  lut        LU target
272  * \param[out] osfs       statistic data to return
273  * \param[in]  max_age    maximum age for cached data
274  * \param[in]  from_cache show that data was get from cache or not
275  *
276  * \retval              0 if successful
277  * \retval              negative value on error
278  */
279 int tgt_statfs_internal(const struct lu_env *env, struct lu_target *lut,
280                         struct obd_statfs *osfs, __u64 max_age, int *from_cache)
281 {
282         struct tg_grants_data *tgd = &lut->lut_tgd;
283         int rc = 0;
284         ENTRY;
285
286         spin_lock(&tgd->tgd_osfs_lock);
287         if (cfs_time_before_64(tgd->tgd_osfs_age, max_age) || max_age == 0) {
288                 u64 unstable;
289
290                 /* statfs data are too old, get up-to-date one.
291                  * we must be cautious here since multiple threads might be
292                  * willing to update statfs data concurrently and we must
293                  * grant that cached statfs data are always consistent */
294
295                 if (tgd->tgd_statfs_inflight == 0)
296                         /* clear inflight counter if no users, although it would
297                          * take a while to overflow this 64-bit counter ... */
298                         tgd->tgd_osfs_inflight = 0;
299                 /* notify tgt_grant_commit() that we want to track writes
300                  * completed as of now */
301                 tgd->tgd_statfs_inflight++;
302                 /* record value of inflight counter before running statfs to
303                  * compute the diff once statfs is completed */
304                 unstable = tgd->tgd_osfs_inflight;
305                 spin_unlock(&tgd->tgd_osfs_lock);
306
307                 /* statfs can sleep ... hopefully not for too long since we can
308                  * call it fairly often as space fills up */
309                 rc = dt_statfs(env, lut->lut_bottom, osfs);
310                 if (unlikely(rc))
311                         GOTO(out, rc);
312
313                 spin_lock(&tgd->tgd_grant_lock);
314                 spin_lock(&tgd->tgd_osfs_lock);
315                 /* calculate how much space was written while we released the
316                  * tgd_osfs_lock */
317                 unstable = tgd->tgd_osfs_inflight - unstable;
318                 tgd->tgd_osfs_unstable = 0;
319                 if (unstable) {
320                         /* some writes committed while we were running statfs
321                          * w/o the tgd_osfs_lock. Those ones got added to
322                          * the cached statfs data that we are about to crunch.
323                          * Take them into account in the new statfs data */
324                         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
325                                                unstable >> tgd->tgd_blockbits);
326                         /* However, we don't really know if those writes got
327                          * accounted in the statfs call, so tell
328                          * tgt_grant_space_left() there is some uncertainty
329                          * on the accounting of those writes.
330                          * The purpose is to prevent spurious error messages in
331                          * tgt_grant_space_left() since those writes might be
332                          * accounted twice. */
333                         tgd->tgd_osfs_unstable += unstable;
334                 }
335                 /* similarly, there is some uncertainty on write requests
336                  * between prepare & commit */
337                 tgd->tgd_osfs_unstable += tgd->tgd_tot_pending;
338                 spin_unlock(&tgd->tgd_grant_lock);
339
340                 /* finally udpate cached statfs data */
341                 tgd->tgd_osfs = *osfs;
342                 tgd->tgd_osfs_age = cfs_time_current_64();
343
344                 tgd->tgd_statfs_inflight--; /* stop tracking */
345                 if (tgd->tgd_statfs_inflight == 0)
346                         tgd->tgd_osfs_inflight = 0;
347                 spin_unlock(&tgd->tgd_osfs_lock);
348
349                 if (from_cache)
350                         *from_cache = 0;
351         } else {
352                 /* use cached statfs data */
353                 *osfs = tgd->tgd_osfs;
354                 spin_unlock(&tgd->tgd_osfs_lock);
355                 if (from_cache)
356                         *from_cache = 1;
357         }
358         GOTO(out, rc);
359
360 out:
361         return rc;
362 }
363 EXPORT_SYMBOL(tgt_statfs_internal);
364
365 /**
366  * Update cached statfs information from the OSD layer
367  *
368  * Refresh statfs information cached in tgd::tgd_osfs if the cache is older
369  * than 1s or if force is set. The OSD layer is in charge of estimating data &
370  * metadata overhead.
371  * This function can sleep so it should not be called with any spinlock held.
372  *
373  * \param[in] env               LU environment passed by the caller
374  * \param[in] exp               export used to print client info in debug
375  *                              messages
376  * \param[in] force             force a refresh of statfs information
377  * \param[out] from_cache       returns whether the statfs information are
378  *                              taken from cache
379  */
380 static void tgt_grant_statfs(const struct lu_env *env, struct obd_export *exp,
381                              int force, int *from_cache)
382 {
383         struct obd_device       *obd = exp->exp_obd;
384         struct lu_target        *lut = obd->u.obt.obt_lut;
385         struct tg_grants_data   *tgd = &lut->lut_tgd;
386         struct tgt_thread_info  *tti;
387         struct obd_statfs       *osfs;
388         __u64                    max_age;
389         int                      rc;
390
391         if (force)
392                 max_age = 0; /* get fresh statfs data */
393         else
394                 max_age = cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS);
395
396         tti = tgt_th_info(env);
397         osfs = &tti->tti_u.osfs;
398         rc = tgt_statfs_internal(env, lut, osfs, max_age, from_cache);
399         if (unlikely(rc)) {
400                 if (from_cache)
401                         *from_cache = 0;
402                 return;
403         }
404
405         CDEBUG(D_CACHE, "%s: cli %s/%p free: %llu avail: %llu\n",
406                obd->obd_name, exp->exp_client_uuid.uuid, exp,
407                osfs->os_bfree << tgd->tgd_blockbits,
408                osfs->os_bavail << tgd->tgd_blockbits);
409 }
410
411 /**
412  * Figure out how much space is available on the backend filesystem after
413  * removing grant space already booked by clients.
414  *
415  * This is done by accessing cached statfs data previously populated by
416  * tgt_grant_statfs(), from which we withdraw the space already granted to
417  * clients and the reserved space.
418  * Caller must hold tgd_grant_lock spinlock.
419  *
420  * \param[in] exp       export associated with the device for which the amount
421  *                      of available space is requested
422  * \retval              amount of non-allocated space, in bytes
423  */
424 static u64 tgt_grant_space_left(struct obd_export *exp)
425 {
426         struct obd_device       *obd = exp->exp_obd;
427         struct lu_target        *lut = obd->u.obt.obt_lut;
428         struct tg_grants_data   *tgd = &lut->lut_tgd;
429         u64                      tot_granted;
430         u64                      left;
431         u64                      avail;
432         u64                      unstable;
433
434         ENTRY;
435         assert_spin_locked(&tgd->tgd_grant_lock);
436
437         spin_lock(&tgd->tgd_osfs_lock);
438         /* get available space from cached statfs data */
439         left = tgd->tgd_osfs.os_bavail << tgd->tgd_blockbits;
440         unstable = tgd->tgd_osfs_unstable; /* those might be accounted twice */
441         spin_unlock(&tgd->tgd_osfs_lock);
442
443         tot_granted = tgd->tgd_tot_granted;
444
445         if (left < tot_granted) {
446                 int mask = (left + unstable <
447                             tot_granted - tgd->tgd_tot_pending) ?
448                             D_ERROR : D_CACHE;
449
450                 CDEBUG_LIMIT(mask, "%s: cli %s/%p left %llu < tot_grant "
451                              "%llu unstable %llu pending %llu "
452                              "dirty %llu\n",
453                              obd->obd_name, exp->exp_client_uuid.uuid, exp,
454                              left, tot_granted, unstable,
455                              tgd->tgd_tot_pending,
456                              tgd->tgd_tot_dirty);
457                 RETURN(0);
458         }
459
460         avail = left;
461         /* Withdraw space already granted to clients */
462         left -= tot_granted;
463
464         /* Align left on block size */
465         left &= ~((1ULL << tgd->tgd_blockbits) - 1);
466
467         CDEBUG(D_CACHE, "%s: cli %s/%p avail %llu left %llu unstable "
468                "%llu tot_grant %llu pending %llu\n", obd->obd_name,
469                exp->exp_client_uuid.uuid, exp, avail, left, unstable,
470                tot_granted, tgd->tgd_tot_pending);
471
472         RETURN(left);
473 }
474
475 /**
476  * Process grant information from obdo structure packed in incoming BRW
477  * and inflate grant counters if required.
478  *
479  * Grab the dirty and seen grant announcements from the incoming obdo and
480  * inflate all grant counters passed in the request if the client does not
481  * support the grant parameters.
482  * We will later calculate the client's new grant and return it.
483  * Caller must hold tgd_grant_lock spinlock.
484  *
485  * \param[in] env       LU environment supplying osfs storage
486  * \param[in] exp       export for which we received the request
487  * \param[in,out] oa    incoming obdo sent by the client
488  */
489 static void tgt_grant_incoming(const struct lu_env *env, struct obd_export *exp,
490                                struct obdo *oa, long chunk)
491 {
492         struct tg_export_data   *ted = &exp->exp_target_data;
493         struct obd_device       *obd = exp->exp_obd;
494         struct tg_grants_data   *tgd = &obd->u.obt.obt_lut->lut_tgd;
495         long                     dirty;
496         long                     dropped;
497         ENTRY;
498
499         assert_spin_locked(&tgd->tgd_grant_lock);
500
501         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
502                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
503                 oa->o_valid &= ~OBD_MD_FLGRANT;
504                 RETURN_EXIT;
505         }
506
507         /* Add some margin, since there is a small race if other RPCs arrive
508          * out-or-order and have already consumed some grant.  We want to
509          * leave this here in case there is a large error in accounting. */
510         CDEBUG(D_CACHE,
511                "%s: cli %s/%p reports grant %llu dropped %u, local %lu\n",
512                obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
513                oa->o_dropped, ted->ted_grant);
514
515         if ((long long)oa->o_dirty < 0)
516                 oa->o_dirty = 0;
517
518         /* inflate grant counters if required */
519         if (!exp_grant_param_supp(exp)) {
520                 oa->o_grant     = tgt_grant_inflate(tgd, oa->o_grant);
521                 oa->o_dirty     = tgt_grant_inflate(tgd, oa->o_dirty);
522                 oa->o_dropped   = tgt_grant_inflate(tgd, (u64)oa->o_dropped);
523                 oa->o_undirty   = tgt_grant_inflate(tgd, oa->o_undirty);
524         }
525
526         dirty = oa->o_dirty;
527         dropped = oa->o_dropped;
528
529         /* Update our accounting now so that statfs takes it into account.
530          * Note that ted_dirty is only approximate and can become incorrect
531          * if RPCs arrive out-of-order.  No important calculations depend
532          * on ted_dirty however, but we must check sanity to not assert. */
533         if (dirty > ted->ted_grant + 4 * chunk)
534                 dirty = ted->ted_grant + 4 * chunk;
535         tgd->tgd_tot_dirty += dirty - ted->ted_dirty;
536         if (ted->ted_grant < dropped) {
537                 CDEBUG(D_CACHE,
538                        "%s: cli %s/%p reports %lu dropped > grant %lu\n",
539                        obd->obd_name, exp->exp_client_uuid.uuid, exp, dropped,
540                        ted->ted_grant);
541                 dropped = 0;
542         }
543         if (tgd->tgd_tot_granted < dropped) {
544                 CERROR("%s: cli %s/%p reports %lu dropped > tot_grant %llu\n",
545                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
546                        dropped, tgd->tgd_tot_granted);
547                 dropped = 0;
548         }
549         tgd->tgd_tot_granted -= dropped;
550         ted->ted_grant -= dropped;
551         ted->ted_dirty = dirty;
552
553         if (ted->ted_dirty < 0 || ted->ted_grant < 0 || ted->ted_pending < 0) {
554                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
555                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
556                        ted->ted_dirty, ted->ted_pending, ted->ted_grant);
557                 spin_unlock(&tgd->tgd_grant_lock);
558                 LBUG();
559         }
560         EXIT;
561 }
562
563 /**
564  * Grant shrink request handler.
565  *
566  * Client nodes can explicitly release grant space (i.e. process called grant
567  * shrinking). This function proceeds with the shrink request when there is
568  * less ungranted space remaining than the amount all of the connected clients
569  * would consume if they used their full grant.
570  * Caller must hold tgd_grant_lock spinlock.
571  *
572  * \param[in] exp               export releasing grant space
573  * \param[in,out] oa            incoming obdo sent by the client
574  * \param[in] left_space        remaining free space with space already granted
575  *                              taken out
576  */
577 static void tgt_grant_shrink(struct obd_export *exp, struct obdo *oa,
578                              u64 left_space)
579 {
580         struct tg_export_data   *ted = &exp->exp_target_data;
581         struct obd_device       *obd = exp->exp_obd;
582         struct tg_grants_data   *tgd = &obd->u.obt.obt_lut->lut_tgd;
583         long                     grant_shrink;
584
585         assert_spin_locked(&tgd->tgd_grant_lock);
586         LASSERT(exp);
587         if (left_space >= tgd->tgd_tot_granted_clients *
588                           TGT_GRANT_SHRINK_LIMIT(exp))
589                 return;
590
591         grant_shrink = oa->o_grant;
592
593         ted->ted_grant -= grant_shrink;
594         tgd->tgd_tot_granted -= grant_shrink;
595
596         CDEBUG(D_CACHE, "%s: cli %s/%p shrink %ld ted_grant %ld total %llu\n",
597                obd->obd_name, exp->exp_client_uuid.uuid, exp, grant_shrink,
598                ted->ted_grant, tgd->tgd_tot_granted);
599
600         /* client has just released some grant, don't grant any space back */
601         oa->o_grant = 0;
602 }
603
604 /**
605  * Calculate how much space is required to write a given network buffer
606  *
607  * This function takes block alignment into account to estimate how much on-disk
608  * space will be required to successfully write the whole niobuf.
609  * Estimated space is inflated if the export does not support
610  * OBD_CONNECT_GRANT_PARAM and if the backend filesystem has a block size
611  * larger than the minimal supported page size (i.e. 4KB).
612  *
613  * \param[in] exp       export associated which the write request
614  *                      if NULL, then size estimate is done for server-side
615  *                      grant allocation.
616  * \param[in] lut       LU target handling the request
617  * \param[in] rnb       network buffer to estimate size of
618  *
619  * \retval              space (in bytes) that will be consumed to write the
620  *                      network buffer
621  */
622 static inline u64 tgt_grant_rnb_size(struct obd_export *exp,
623                                      struct lu_target *lut,
624                                      struct niobuf_remote *rnb)
625 {
626         struct tg_grants_data *tgd = &lut->lut_tgd;
627         u64 blksize;
628         u64 bytes;
629         u64 end;
630
631         if (exp && !exp_grant_param_supp(exp) &&
632             tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT)
633                 blksize = 1ULL << COMPAT_BSIZE_SHIFT;
634         else
635                 blksize = 1ULL << tgd->tgd_blockbits;
636
637         /* The network buffer might span several blocks, align it on block
638          * boundaries */
639         bytes  = rnb->rnb_offset & (blksize - 1);
640         bytes += rnb->rnb_len;
641         end    = bytes & (blksize - 1);
642         if (end)
643                 bytes += blksize - end;
644
645         if (exp == NULL || exp_grant_param_supp(exp)) {
646                 /* add per-extent insertion cost */
647                 u64 max_ext;
648                 int nr_ext;
649
650                 max_ext = blksize * lut->lut_dt_conf.ddp_max_extent_blks;
651                 nr_ext = (bytes + max_ext - 1) / max_ext;
652                 bytes += nr_ext * lut->lut_dt_conf.ddp_extent_tax;
653         } else {
654                 /* Inflate grant space if client does not support extent-based
655                  * grant allocation */
656                 bytes = tgt_grant_inflate(tgd, (u64)bytes);
657         }
658
659         return bytes;
660 }
661
662 /**
663  * Validate grant accounting for each incoming remote network buffer.
664  *
665  * When clients have dirtied as much space as they've been granted they
666  * fall through to sync writes. These sync writes haven't been expressed
667  * in grants and need to error with ENOSPC when there isn't room in the
668  * filesystem for them after grants are taken into account. However,
669  * writeback of the dirty data that was already granted space can write
670  * right on through.
671  * The OBD_BRW_GRANTED flag will be set in the rnb_flags of each network
672  * buffer which has been granted enough space to proceed. Buffers without
673  * this flag will fail to be written with -ENOSPC (see tgt_preprw_write().
674  * Caller must hold tgd_grant_lock spinlock.
675  *
676  * \param[in] env       LU environment passed by the caller
677  * \param[in] exp       export identifying the client which sent the RPC
678  * \param[in] oa        incoming obdo in which we should return the pack the
679  *                      additional grant
680  * \param[in,out] rnb   the list of network buffers
681  * \param[in] niocount  the number of network buffers in the list
682  * \param[in] left      the remaining free space with space already granted
683  *                      taken out
684  */
685 static void tgt_grant_check(const struct lu_env *env, struct obd_export *exp,
686                             struct obdo *oa, struct niobuf_remote *rnb,
687                             int niocount, u64 *left)
688 {
689         struct tg_export_data   *ted = &exp->exp_target_data;
690         struct obd_device       *obd = exp->exp_obd;
691         struct lu_target        *lut = obd->u.obt.obt_lut;
692         struct tg_grants_data   *tgd = &lut->lut_tgd;
693         unsigned long            ungranted = 0;
694         unsigned long            granted = 0;
695         int                      i;
696         bool                     skip = false;
697
698         ENTRY;
699
700         assert_spin_locked(&tgd->tgd_grant_lock);
701
702         if (obd->obd_recovering) {
703                 /* Replaying write. Grant info have been processed already so no
704                  * need to do any enforcement here. It is worth noting that only
705                  * bulk writes with all rnbs having OBD_BRW_FROM_GRANT can be
706                  * replayed. If one page hasn't OBD_BRW_FROM_GRANT set, then
707                  * the whole bulk is written synchronously */
708                 skip = true;
709                 CDEBUG(D_CACHE, "Replaying write, skipping accounting\n");
710         } else if ((oa->o_valid & OBD_MD_FLFLAGS) &&
711                    (oa->o_flags & OBD_FL_RECOV_RESEND)) {
712                 /* Recoverable resend, grant info have already been processed as
713                  * well */
714                 skip = true;
715                 CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
716                                 "accounting\n");
717         } else if (exp_grant_param_supp(exp) && oa->o_grant_used > 0) {
718                 /* Client supports the new grant parameters and is telling us
719                  * how much grant space it consumed for this bulk write.
720                  * Although all rnbs are supposed to have the OBD_BRW_FROM_GRANT
721                  * flag set, we will scan the rnb list and looks for non-cache
722                  * I/O in case it changes in the future */
723                 if (ted->ted_grant >= oa->o_grant_used) {
724                         /* skip grant accounting for rnbs with
725                          * OBD_BRW_FROM_GRANT and just used grant consumption
726                          * claimed in the request */
727                         granted = oa->o_grant_used;
728                         skip = true;
729                 } else {
730                         /* client has used more grants for this request that
731                          * it owns ... */
732                         CERROR("%s: cli %s claims %lu GRANT, real grant %lu\n",
733                                exp->exp_obd->obd_name,
734                                exp->exp_client_uuid.uuid,
735                                (unsigned long)oa->o_grant_used, ted->ted_grant);
736
737                         /* check whether we can fill the gap with unallocated
738                          * grant */
739                         if (*left > (oa->o_grant_used - ted->ted_grant)) {
740                                 /* ouf .. we are safe for now */
741                                 granted = ted->ted_grant;
742                                 ungranted = oa->o_grant_used - granted;
743                                 *left -= ungranted;
744                                 skip = true;
745                         }
746                         /* too bad, but we cannot afford to blow up our grant
747                          * accounting. The loop below will handle each rnb in
748                          * case by case. */
749                 }
750         }
751
752         for (i = 0; i < niocount; i++) {
753                 int bytes;
754
755                 if ((rnb[i].rnb_flags & OBD_BRW_FROM_GRANT)) {
756                         if (skip) {
757                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
758                                 continue;
759                         }
760
761                         /* compute how much grant space is actually needed for
762                          * this rnb, inflate grant if required */
763                         bytes = tgt_grant_rnb_size(exp, lut, &rnb[i]);
764                         if (ted->ted_grant >= granted + bytes) {
765                                 granted += bytes;
766                                 rnb[i].rnb_flags |= OBD_BRW_GRANTED;
767                                 continue;
768                         }
769
770                         CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d GRANT, "
771                                "real grant %lu idx %d\n", obd->obd_name,
772                                exp->exp_client_uuid.uuid, exp, granted, bytes,
773                                ted->ted_grant, i);
774                 }
775
776                 if (obd->obd_recovering)
777                         CERROR("%s: cli %s is replaying OST_WRITE while one rnb"
778                                " hasn't OBD_BRW_FROM_GRANT set (0x%x)\n",
779                                obd->obd_name, exp->exp_client_uuid.uuid,
780                                rnb[i].rnb_flags);
781
782                 /* Consume grant space on the server.
783                  * Unlike above, tgt_grant_rnb_size() is called with exp = NULL
784                  * so that the required grant space isn't inflated. This is
785                  * done on purpose since the server can deal with large block
786                  * size, unlike some clients */
787                 bytes = tgt_grant_rnb_size(NULL, lut, &rnb[i]);
788                 if (*left > bytes) {
789                         /* if enough space, pretend it was granted */
790                         ungranted += bytes;
791                         *left -= bytes;
792                         rnb[i].rnb_flags |= OBD_BRW_GRANTED;
793                         continue;
794                 }
795
796                 /* We can't check for already-mapped blocks here (make sense
797                  * when backend filesystem does not use COW) as it requires
798                  * dropping the grant lock.
799                  * Instead, we clear OBD_BRW_GRANTED and in that case we need
800                  * to go through and verify if all of the blocks not marked
801                  *  BRW_GRANTED are already mapped and we can ignore this error.
802                  */
803                 rnb[i].rnb_flags &= ~OBD_BRW_GRANTED;
804                 CDEBUG(D_CACHE, "%s: cli %s/%p idx %d no space for %d\n",
805                        obd->obd_name, exp->exp_client_uuid.uuid, exp, i, bytes);
806         }
807
808         /* record in o_grant_used the actual space reserved for the I/O, will be
809          * used later in tgt_grant_commmit() */
810         oa->o_grant_used = granted + ungranted;
811
812         /* record space used for the I/O, will be used in tgt_grant_commmit() */
813         /* Now substract what the clients has used already.  We don't subtract
814          * this from the tot_granted yet, so that other client's can't grab
815          * that space before we have actually allocated our blocks. That
816          * happens in tgt_grant_commit() after the writes are done. */
817         ted->ted_grant -= granted;
818         ted->ted_pending += oa->o_grant_used;
819         tgd->tgd_tot_granted += ungranted;
820         tgd->tgd_tot_pending += oa->o_grant_used;
821
822         CDEBUG(D_CACHE,
823                "%s: cli %s/%p granted: %lu ungranted: %lu grant: %lu dirty: %lu"
824                "\n", obd->obd_name, exp->exp_client_uuid.uuid, exp,
825                granted, ungranted, ted->ted_grant, ted->ted_dirty);
826
827         if (obd->obd_recovering || (oa->o_valid & OBD_MD_FLGRANT) == 0)
828                 /* don't update dirty accounting during recovery or
829                  * if grant information got discarded (e.g. during resend) */
830                 RETURN_EXIT;
831
832         if (ted->ted_dirty < granted) {
833                 CWARN("%s: cli %s/%p claims granted %lu > ted_dirty %lu\n",
834                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
835                        granted, ted->ted_dirty);
836                 granted = ted->ted_dirty;
837         }
838         tgd->tgd_tot_dirty -= granted;
839         ted->ted_dirty -= granted;
840
841         if (ted->ted_dirty < 0 || ted->ted_grant < 0 || ted->ted_pending < 0) {
842                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
843                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
844                        ted->ted_dirty, ted->ted_pending, ted->ted_grant);
845                 spin_unlock(&tgd->tgd_grant_lock);
846                 LBUG();
847         }
848         EXIT;
849 }
850
851 /**
852  * Allocate additional grant space to a client
853  *
854  * Calculate how much grant space to return to client, based on how much space
855  * is currently free and how much of that is already granted.
856  * Caller must hold tgd_grant_lock spinlock.
857  *
858  * \param[in] exp               export of the client which sent the request
859  * \param[in] curgrant          current grant claimed by the client
860  * \param[in] want              how much grant space the client would like to
861  *                              have
862  * \param[in] left              remaining free space with granted space taken
863  *                              out
864  * \param[in] conservative      if set to true, the server should be cautious
865  *                              and limit how much space is granted back to the
866  *                              client. Otherwise, the server should try hard to
867  *                              satisfy the client request.
868  *
869  * \retval                      amount of grant space allocated
870  */
871 static long tgt_grant_alloc(struct obd_export *exp, u64 curgrant,
872                             u64 want, u64 left, long chunk,
873                             bool conservative)
874 {
875         struct obd_device       *obd = exp->exp_obd;
876         struct tg_grants_data   *tgd = &obd->u.obt.obt_lut->lut_tgd;
877         struct tg_export_data   *ted = &exp->exp_target_data;
878         u64                      grant;
879
880         ENTRY;
881
882         /* When tgd_grant_compat_disable is set, we don't grant any space to
883          * clients not supporting OBD_CONNECT_GRANT_PARAM.
884          * Otherwise, space granted to such a client is inflated since it
885          * consumes PAGE_SIZE of grant space per block */
886         if ((obd->obd_self_export != exp && !exp_grant_param_supp(exp) &&
887              tgd->tgd_grant_compat_disable) || left == 0 || exp->exp_failed)
888                 RETURN(0);
889
890         if (want > 0x7fffffff) {
891                 CERROR("%s: client %s/%p requesting > 2GB grant %llu\n",
892                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
893                 RETURN(0);
894         }
895
896         /* Grant some fraction of the client's requested grant space so that
897          * they are not always waiting for write credits (not all of it to
898          * avoid overgranting in face of multiple RPCs in flight).  This
899          * essentially will be able to control the OSC_MAX_RIF for a client.
900          *
901          * If we do have a large disparity between what the client thinks it
902          * has and what we think it has, don't grant very much and let the
903          * client consume its grant first.  Either it just has lots of RPCs
904          * in flight, or it was evicted and its grants will soon be used up. */
905         if (curgrant >= want || curgrant >= ted->ted_grant + chunk)
906                 RETURN(0);
907
908         if (obd->obd_recovering)
909                 conservative = false;
910
911         if (conservative)
912                 /* don't grant more than 1/8th of the remaining free space in
913                  * one chunk */
914                 left >>= 3;
915         grant = min(want - curgrant, left);
916         /* round grant up to the next block size */
917         grant = (grant + (1 << tgd->tgd_blockbits) - 1) &
918                 ~((1ULL << tgd->tgd_blockbits) - 1);
919
920         if (!grant)
921                 RETURN(0);
922
923         /* Limit to grant_chunk if not reconnect/recovery */
924         if ((grant > chunk) && conservative)
925                 grant = chunk;
926
927         tgd->tgd_tot_granted += grant;
928         ted->ted_grant += grant;
929
930         if (ted->ted_grant < 0) {
931                 CERROR("%s: cli %s/%p grant %ld want %llu current %llu\n",
932                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
933                        ted->ted_grant, want, curgrant);
934                 spin_unlock(&tgd->tgd_grant_lock);
935                 LBUG();
936         }
937
938         CDEBUG(D_CACHE,
939                "%s: cli %s/%p wants: %llu current grant %llu"
940                " granting: %llu\n", obd->obd_name, exp->exp_client_uuid.uuid,
941                exp, want, curgrant, grant);
942         CDEBUG(D_CACHE,
943                "%s: cli %s/%p tot cached:%llu granted:%llu"
944                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
945                exp, tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
946                obd->obd_num_exports);
947
948         RETURN(grant);
949 }
950
951 /**
952  * Handle grant space allocation on client connection & reconnection.
953  *
954  * A new non-readonly connection gets an initial grant allocation equals to
955  * tgt_grant_chunk() (i.e. twice the max BRW size in most of the cases).
956  * On reconnection, grant counters between client & target are resynchronized
957  * and additional space might be granted back if possible.
958  *
959  * \param[in] env       LU environment provided by the caller
960  * \param[in] exp       client's export which is (re)connecting
961  * \param[in,out] data  obd_connect_data structure sent by the client in the
962  *                      connect request
963  * \param[in] new_conn  must set to true if this is a new connection and false
964  *                      for a reconnection
965  */
966 void tgt_grant_connect(const struct lu_env *env, struct obd_export *exp,
967                        struct obd_connect_data *data, bool new_conn)
968 {
969         struct lu_target        *lut = exp->exp_obd->u.obt.obt_lut;
970         struct tg_grants_data   *tgd = &lut->lut_tgd;
971         struct tg_export_data   *ted = &exp->exp_target_data;
972         u64                      left = 0;
973         u64                      want;
974         long                     chunk;
975         int                      from_cache;
976         int                      force = 0; /* can use cached data */
977
978         /* don't grant space to client with read-only access */
979         if (OCD_HAS_FLAG(data, RDONLY) ||
980             (!OCD_HAS_FLAG(data, GRANT_PARAM) &&
981              tgd->tgd_grant_compat_disable)) {
982                 data->ocd_grant = 0;
983                 data->ocd_connect_flags &= ~(OBD_CONNECT_GRANT |
984                                              OBD_CONNECT_GRANT_PARAM);
985                 RETURN_EXIT;
986         }
987
988         if (OCD_HAS_FLAG(data, GRANT_PARAM))
989                 want = data->ocd_grant;
990         else
991                 want = tgt_grant_inflate(tgd, data->ocd_grant);
992         chunk = tgt_grant_chunk(exp, lut, data);
993 refresh:
994         tgt_grant_statfs(env, exp, force, &from_cache);
995
996         spin_lock(&tgd->tgd_grant_lock);
997
998         /* Grab free space from cached info and take out space already granted
999          * to clients as well as reserved space */
1000         left = tgt_grant_space_left(exp);
1001
1002         /* get fresh statfs data if we are short in ungranted space */
1003         if (from_cache && left < 32 * chunk) {
1004                 spin_unlock(&tgd->tgd_grant_lock);
1005                 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
1006                 force = 1;
1007                 goto refresh;
1008         }
1009
1010         tgt_grant_alloc(exp, (u64)ted->ted_grant, want, left, chunk, new_conn);
1011
1012         /* return to client its current grant */
1013         if (OCD_HAS_FLAG(data, GRANT_PARAM))
1014                 data->ocd_grant = ted->ted_grant;
1015         else
1016                 /* deflate grant */
1017                 data->ocd_grant = tgt_grant_deflate(tgd, (u64)ted->ted_grant);
1018
1019         /* reset dirty accounting */
1020         tgd->tgd_tot_dirty -= ted->ted_dirty;
1021         ted->ted_dirty = 0;
1022
1023         if (new_conn && OCD_HAS_FLAG(data, GRANT))
1024                 tgd->tgd_tot_granted_clients++;
1025
1026         spin_unlock(&tgd->tgd_grant_lock);
1027
1028         CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %d want: %llu left: %llu\n",
1029                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
1030                exp, data->ocd_grant, want, left);
1031
1032         EXIT;
1033 }
1034 EXPORT_SYMBOL(tgt_grant_connect);
1035
1036 /**
1037  * Release all grant space attached to a given export.
1038  *
1039  * Remove a client from the grant accounting totals.  We also remove
1040  * the export from the obd device under the osfs and dev locks to ensure
1041  * that the tgt_grant_sanity_check() calculations are always valid.
1042  * The client should do something similar when it invalidates its import.
1043  *
1044  * \param[in] exp       client's export to remove from grant accounting
1045  */
1046 void tgt_grant_discard(struct obd_export *exp)
1047 {
1048         struct obd_device       *obd = exp->exp_obd;
1049         struct tg_grants_data   *tgd = &obd->u.obt.obt_lut->lut_tgd;
1050         struct tg_export_data   *ted = &exp->exp_target_data;
1051
1052         spin_lock(&tgd->tgd_grant_lock);
1053         LASSERTF(tgd->tgd_tot_granted >= ted->ted_grant,
1054                  "%s: tot_granted %llu cli %s/%p ted_grant %ld\n",
1055                  obd->obd_name, tgd->tgd_tot_granted,
1056                  exp->exp_client_uuid.uuid, exp, ted->ted_grant);
1057         tgd->tgd_tot_granted -= ted->ted_grant;
1058         ted->ted_grant = 0;
1059         LASSERTF(tgd->tgd_tot_pending >= ted->ted_pending,
1060                  "%s: tot_pending %llu cli %s/%p ted_pending %ld\n",
1061                  obd->obd_name, tgd->tgd_tot_pending,
1062                  exp->exp_client_uuid.uuid, exp, ted->ted_pending);
1063         /* tgd_tot_pending is handled in tgt_grant_commit as bulk
1064          * commmits */
1065         LASSERTF(tgd->tgd_tot_dirty >= ted->ted_dirty,
1066                  "%s: tot_dirty %llu cli %s/%p ted_dirty %ld\n",
1067                  obd->obd_name, tgd->tgd_tot_dirty,
1068                  exp->exp_client_uuid.uuid, exp, ted->ted_dirty);
1069         tgd->tgd_tot_dirty -= ted->ted_dirty;
1070         ted->ted_dirty = 0;
1071         spin_unlock(&tgd->tgd_grant_lock);
1072 }
1073 EXPORT_SYMBOL(tgt_grant_discard);
1074
1075 /**
1076  * Process grant information from incoming bulk read request.
1077  *
1078  * Extract grant information packed in obdo structure (OBD_MD_FLGRANT set in
1079  * o_valid). Bulk reads usually comes with grant announcements (number of dirty
1080  * blocks, remaining amount of grant space, ...) and could also include a grant
1081  * shrink request. Unlike bulk write, no additional grant space is returned on
1082  * bulk read request.
1083  *
1084  * \param[in] env       is the lu environment provided by the caller
1085  * \param[in] exp       is the export of the client which sent the request
1086  * \param[in,out] oa    is the incoming obdo sent by the client
1087  */
1088 void tgt_grant_prepare_read(const struct lu_env *env,
1089                             struct obd_export *exp, struct obdo *oa)
1090 {
1091         struct lu_target        *lut = exp->exp_obd->u.obt.obt_lut;
1092         struct tg_grants_data   *tgd = &lut->lut_tgd;
1093         int                      do_shrink;
1094         u64                      left = 0;
1095
1096         ENTRY;
1097
1098         if (!oa)
1099                 RETURN_EXIT;
1100
1101         if ((oa->o_valid & OBD_MD_FLGRANT) == 0)
1102                 /* The read request does not contain any grant
1103                  * information */
1104                 RETURN_EXIT;
1105
1106         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1107             (oa->o_flags & OBD_FL_SHRINK_GRANT)) {
1108                 /* To process grant shrink request, we need to know how much
1109                  * available space remains on the backend filesystem.
1110                  * Shrink requests are not so common, we always get fresh
1111                  * statfs information. */
1112                 tgt_grant_statfs(env, exp, 1, NULL);
1113
1114                 /* protect all grant counters */
1115                 spin_lock(&tgd->tgd_grant_lock);
1116
1117                 /* Grab free space from cached statfs data and take out space
1118                  * already granted to clients as well as reserved space */
1119                 left = tgt_grant_space_left(exp);
1120
1121                 /* all set now to proceed with shrinking */
1122                 do_shrink = 1;
1123         } else {
1124                 /* no grant shrinking request packed in the obdo and
1125                  * since we don't grant space back on reads, no point
1126                  * in running statfs, so just skip it and process
1127                  * incoming grant data directly. */
1128                 spin_lock(&tgd->tgd_grant_lock);
1129                 do_shrink = 0;
1130         }
1131
1132         /* extract incoming grant information provided by the client and
1133          * inflate grant counters if required */
1134         tgt_grant_incoming(env, exp, oa, tgt_grant_chunk(exp, lut, NULL));
1135
1136         /* unlike writes, we don't return grants back on reads unless a grant
1137          * shrink request was packed and we decided to turn it down. */
1138         if (do_shrink)
1139                 tgt_grant_shrink(exp, oa, left);
1140         else
1141                 oa->o_grant = 0;
1142
1143         if (!exp_grant_param_supp(exp))
1144                 oa->o_grant = tgt_grant_deflate(tgd, oa->o_grant);
1145         spin_unlock(&tgd->tgd_grant_lock);
1146         EXIT;
1147 }
1148 EXPORT_SYMBOL(tgt_grant_prepare_read);
1149
1150 /**
1151  * Process grant information from incoming bulk write request.
1152  *
1153  * This function extracts client's grant announcements from incoming bulk write
1154  * request and attempts to allocate grant space for network buffers that need it
1155  * (i.e. OBD_BRW_FROM_GRANT not set in rnb_fags).
1156  * Network buffers which aren't granted the OBD_BRW_GRANTED flag should not
1157  * proceed further and should fail with -ENOSPC.
1158  * Whenever possible, additional grant space will be returned to the client
1159  * in the bulk write reply.
1160  * tgt_grant_prepare_write() must be called before writting any buffers to
1161  * the backend storage. This function works in pair with tgt_grant_commit()
1162  * which must be invoked once all buffers have been written to disk in order
1163  * to release space from the pending grant counter.
1164  *
1165  * \param[in] env       LU environment provided by the caller
1166  * \param[in] exp       export of the client which sent the request
1167  * \param[in] oa        incoming obdo sent by the client
1168  * \param[in] rnb       list of network buffers
1169  * \param[in] niocount  number of network buffers in the list
1170  */
1171 void tgt_grant_prepare_write(const struct lu_env *env,
1172                              struct obd_export *exp, struct obdo *oa,
1173                              struct niobuf_remote *rnb, int niocount)
1174 {
1175         struct obd_device       *obd = exp->exp_obd;
1176         struct lu_target        *lut = obd->u.obt.obt_lut;
1177         struct tg_grants_data   *tgd = &lut->lut_tgd;
1178         u64                      left;
1179         int                      from_cache;
1180         int                      force = 0; /* can use cached data intially */
1181         long                     chunk = tgt_grant_chunk(exp, lut, NULL);
1182
1183         ENTRY;
1184
1185 refresh:
1186         /* get statfs information from OSD layer */
1187         tgt_grant_statfs(env, exp, force, &from_cache);
1188
1189         spin_lock(&tgd->tgd_grant_lock); /* protect all grant counters */
1190
1191         /* Grab free space from cached statfs data and take out space already
1192          * granted to clients as well as reserved space */
1193         left = tgt_grant_space_left(exp);
1194
1195         /* Get fresh statfs data if we are short in ungranted space */
1196         if (from_cache && left < 32 * chunk) {
1197                 spin_unlock(&tgd->tgd_grant_lock);
1198                 CDEBUG(D_CACHE, "%s: fs has no space left and statfs too old\n",
1199                        obd->obd_name);
1200                 force = 1;
1201                 goto refresh;
1202         }
1203
1204         /* When close to free space exhaustion, trigger a sync to force
1205          * writeback cache to consume required space immediately and release as
1206          * much space as possible. */
1207         if (!obd->obd_recovering && force != 2 && left < chunk) {
1208                 bool from_grant = true;
1209                 int  i;
1210
1211                 /* That said, it is worth running a sync only if some pages did
1212                  * not consume grant space on the client and could thus fail
1213                  * with ENOSPC later in tgt_grant_check() */
1214                 for (i = 0; i < niocount; i++)
1215                         if (!(rnb[i].rnb_flags & OBD_BRW_FROM_GRANT))
1216                                 from_grant = false;
1217
1218                 if (!from_grant) {
1219                         /* at least one network buffer requires acquiring grant
1220                          * space on the server */
1221                         spin_unlock(&tgd->tgd_grant_lock);
1222                         /* discard errors, at least we tried ... */
1223                         dt_sync(env, lut->lut_bottom);
1224                         force = 2;
1225                         goto refresh;
1226                 }
1227         }
1228
1229         /* extract incoming grant information provided by the client,
1230          * and inflate grant counters if required */
1231         tgt_grant_incoming(env, exp, oa, chunk);
1232
1233         /* check limit */
1234         tgt_grant_check(env, exp, oa, rnb, niocount, &left);
1235
1236         if (!(oa->o_valid & OBD_MD_FLGRANT)) {
1237                 spin_unlock(&tgd->tgd_grant_lock);
1238                 RETURN_EXIT;
1239         }
1240
1241         /* if OBD_FL_SHRINK_GRANT is set, the client is willing to release some
1242          * grant space. */
1243         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1244             (oa->o_flags & OBD_FL_SHRINK_GRANT))
1245                 tgt_grant_shrink(exp, oa, left);
1246         else
1247                 /* grant more space back to the client if possible */
1248                 oa->o_grant = tgt_grant_alloc(exp, oa->o_grant, oa->o_undirty,
1249                                               left, chunk, true);
1250
1251         if (!exp_grant_param_supp(exp))
1252                 oa->o_grant = tgt_grant_deflate(tgd, oa->o_grant);
1253         spin_unlock(&tgd->tgd_grant_lock);
1254         EXIT;
1255 }
1256 EXPORT_SYMBOL(tgt_grant_prepare_write);
1257
1258 /**
1259  * Consume grant space reserved for object creation.
1260  *
1261  * Grant space is allocated to the local self export for object precreation.
1262  * This is required to prevent object precreation from consuming grant space
1263  * allocated to client nodes for the data writeback cache.
1264  * This function consumes enough space to create \a nr objects and allocates
1265  * more grant space to the self export for future precreation requests, if
1266  * possible.
1267  *
1268  * \param[in] env       LU environment provided by the caller
1269  * \param[in] exp       export holding the grant space for precreation (= self
1270  *                      export currently)
1271  * \param[in] nr        number of objects to be created
1272  *
1273  * \retval >= 0         amount of grant space allocated to the precreate request
1274  * \retval -ENOSPC      on failure
1275  */
1276 long tgt_grant_create(const struct lu_env *env, struct obd_export *exp, int *nr)
1277 {
1278         struct lu_target        *lut = exp->exp_obd->u.obt.obt_lut;
1279         struct tg_grants_data   *tgd = &lut->lut_tgd;
1280         struct tg_export_data   *ted = &exp->exp_target_data;
1281         u64                      left = 0;
1282         unsigned long            wanted;
1283         unsigned long            granted;
1284         ENTRY;
1285
1286         if (exp->exp_obd->obd_recovering ||
1287             lut->lut_dt_conf.ddp_inodespace == 0)
1288                 /* don't enforce grant during recovery */
1289                 RETURN(0);
1290
1291         /* Update statfs data if required */
1292         tgt_grant_statfs(env, exp, 1, NULL);
1293
1294         /* protect all grant counters */
1295         spin_lock(&tgd->tgd_grant_lock);
1296
1297         /* fail precreate request if there is not enough blocks available for
1298          * writing */
1299         if (tgd->tgd_osfs.os_bavail - (ted->ted_grant >> tgd->tgd_blockbits) <
1300             (tgd->tgd_osfs.os_blocks >> 10)) {
1301                 spin_unlock(&tgd->tgd_grant_lock);
1302                 CDEBUG(D_RPCTRACE, "%s: not enough space for create %llu\n",
1303                        exp->exp_obd->obd_name,
1304                        tgd->tgd_osfs.os_bavail * tgd->tgd_osfs.os_blocks);
1305                 RETURN(-ENOSPC);
1306         }
1307
1308         /* Grab free space from cached statfs data and take out space
1309          * already granted to clients as well as reserved space */
1310         left = tgt_grant_space_left(exp);
1311
1312         /* compute how much space is required to handle the precreation
1313          * request */
1314         wanted = *nr * lut->lut_dt_conf.ddp_inodespace;
1315         if (wanted > ted->ted_grant + left) {
1316                 /* that's beyond what remains, adjust the number of objects that
1317                  * can be safely precreated */
1318                 wanted = ted->ted_grant + left;
1319                 *nr = wanted / lut->lut_dt_conf.ddp_inodespace;
1320                 if (*nr == 0) {
1321                         /* we really have no space any more for precreation,
1322                          * fail the precreate request with ENOSPC */
1323                         spin_unlock(&tgd->tgd_grant_lock);
1324                         RETURN(-ENOSPC);
1325                 }
1326                 /* compute space needed for the new number of creations */
1327                 wanted = *nr * lut->lut_dt_conf.ddp_inodespace;
1328         }
1329         LASSERT(wanted <= ted->ted_grant + left);
1330
1331         if (wanted <= ted->ted_grant) {
1332                 /* we've enough grant space to handle this precreate request */
1333                 ted->ted_grant -= wanted;
1334         } else {
1335                 /* we need to take some space from the ungranted pool */
1336                 tgd->tgd_tot_granted += wanted - ted->ted_grant;
1337                 left -= wanted - ted->ted_grant;
1338                 ted->ted_grant = 0;
1339         }
1340         granted = wanted;
1341         ted->ted_pending += granted;
1342         tgd->tgd_tot_pending += granted;
1343
1344         /* grant more space for precreate purpose if possible. */
1345         wanted = OST_MAX_PRECREATE * lut->lut_dt_conf.ddp_inodespace / 2;
1346         if (wanted > ted->ted_grant) {
1347                 long chunk;
1348
1349                 /* always try to book enough space to handle a large precreate
1350                  * request */
1351                 chunk = tgt_grant_chunk(exp, lut, NULL);
1352                 wanted -= ted->ted_grant;
1353                 tgt_grant_alloc(exp, ted->ted_grant, wanted, left, chunk,
1354                                 false);
1355         }
1356         spin_unlock(&tgd->tgd_grant_lock);
1357         RETURN(granted);
1358 }
1359 EXPORT_SYMBOL(tgt_grant_create);
1360
1361 /**
1362  * Release grant space added to the pending counter by tgt_grant_prepare_write()
1363  *
1364  * Update pending grant counter once buffers have been written to the disk.
1365  *
1366  * \param[in] exp       export of the client which sent the request
1367  * \param[in] pending   amount of reserved space to be released
1368  * \param[in] rc        return code of pre-commit operations
1369  */
1370 void tgt_grant_commit(struct obd_export *exp, unsigned long pending,
1371                       int rc)
1372 {
1373         struct tg_grants_data *tgd = &exp->exp_obd->u.obt.obt_lut->lut_tgd;
1374
1375         ENTRY;
1376
1377         /* get space accounted in tot_pending for the I/O, set in
1378          * tgt_grant_check() */
1379         if (pending == 0)
1380                 RETURN_EXIT;
1381
1382         spin_lock(&tgd->tgd_grant_lock);
1383         /* Don't update statfs data for errors raised before commit (e.g.
1384          * bulk transfer failed, ...) since we know those writes have not been
1385          * processed. For other errors hit during commit, we cannot really tell
1386          * whether or not something was written, so we update statfs data.
1387          * In any case, this should not be fatal since we always get fresh
1388          * statfs data before failing a request with ENOSPC */
1389         if (rc == 0) {
1390                 spin_lock(&tgd->tgd_osfs_lock);
1391                 /* Take pending out of cached statfs data */
1392                 tgd->tgd_osfs.os_bavail -= min_t(u64,
1393                                                  tgd->tgd_osfs.os_bavail,
1394                                                  pending >> tgd->tgd_blockbits);
1395                 if (tgd->tgd_statfs_inflight)
1396                         /* someone is running statfs and want to be notified of
1397                          * writes happening meanwhile */
1398                         tgd->tgd_osfs_inflight += pending;
1399                 spin_unlock(&tgd->tgd_osfs_lock);
1400         }
1401
1402         if (exp->exp_target_data.ted_pending < pending) {
1403                 CERROR("%s: cli %s/%p ted_pending(%lu) < grant_used(%lu)\n",
1404                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1405                        exp->exp_target_data.ted_pending, pending);
1406                 spin_unlock(&tgd->tgd_grant_lock);
1407                 LBUG();
1408         }
1409         exp->exp_target_data.ted_pending -= pending;
1410
1411         if (tgd->tgd_tot_granted < pending) {
1412                 CERROR("%s: cli %s/%p tot_granted(%llu) < grant_used(%lu)\n",
1413                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1414                        tgd->tgd_tot_granted, pending);
1415                 spin_unlock(&tgd->tgd_grant_lock);
1416                 LBUG();
1417         }
1418         tgd->tgd_tot_granted -= pending;
1419
1420         if (tgd->tgd_tot_pending < pending) {
1421                 CERROR("%s: cli %s/%p tot_pending(%llu) < grant_used(%lu)\n",
1422                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
1423                        tgd->tgd_tot_pending, pending);
1424                 spin_unlock(&tgd->tgd_grant_lock);
1425                 LBUG();
1426         }
1427         tgd->tgd_tot_pending -= pending;
1428         spin_unlock(&tgd->tgd_grant_lock);
1429         EXIT;
1430 }
1431 EXPORT_SYMBOL(tgt_grant_commit);
1432
1433 struct tgt_grant_cb {
1434         /* commit callback structure */
1435         struct dt_txn_commit_cb  tgc_cb;
1436         /* export associated with the bulk write */
1437         struct obd_export       *tgc_exp;
1438         /* pending grant to be released */
1439         unsigned long            tgc_granted;
1440 };
1441
1442 /**
1443  * Callback function for grant releasing
1444  *
1445  * Release grant space reserved by the client node.
1446  *
1447  * \param[in] env       execution environment
1448  * \param[in] th        transaction handle
1449  * \param[in] cb        callback data
1450  * \param[in] err       error code
1451  */
1452 static void tgt_grant_commit_cb(struct lu_env *env, struct thandle *th,
1453                                 struct dt_txn_commit_cb *cb, int err)
1454 {
1455         struct tgt_grant_cb *tgc;
1456
1457         tgc = container_of(cb, struct tgt_grant_cb, tgc_cb);
1458
1459         tgt_grant_commit(tgc->tgc_exp, tgc->tgc_granted, err);
1460         class_export_cb_put(tgc->tgc_exp);
1461         OBD_FREE_PTR(tgc);
1462 }
1463
1464 /**
1465  * Add callback for grant releasing
1466  *
1467  * Register a commit callback to release grant space.
1468  *
1469  * \param[in] th        transaction handle
1470  * \param[in] exp       OBD export of client
1471  * \param[in] granted   amount of grant space to be released upon commit
1472  *
1473  * \retval              0 on successful callback adding
1474  * \retval              negative value on error
1475  */
1476 int tgt_grant_commit_cb_add(struct thandle *th, struct obd_export *exp,
1477                             unsigned long granted)
1478 {
1479         struct tgt_grant_cb     *tgc;
1480         struct dt_txn_commit_cb *dcb;
1481         int                      rc;
1482         ENTRY;
1483
1484         OBD_ALLOC_PTR(tgc);
1485         if (tgc == NULL)
1486                 RETURN(-ENOMEM);
1487
1488         tgc->tgc_exp = class_export_cb_get(exp);
1489         tgc->tgc_granted = granted;
1490
1491         dcb = &tgc->tgc_cb;
1492         dcb->dcb_func = tgt_grant_commit_cb;
1493         INIT_LIST_HEAD(&dcb->dcb_linkage);
1494         strlcpy(dcb->dcb_name, "tgt_grant_commit_cb", sizeof(dcb->dcb_name));
1495
1496         rc = dt_trans_cb_add(th, dcb);
1497         if (rc) {
1498                 class_export_cb_put(tgc->tgc_exp);
1499                 OBD_FREE_PTR(tgc);
1500         }
1501
1502         RETURN(rc);
1503 }
1504 EXPORT_SYMBOL(tgt_grant_commit_cb_add);