Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include "qsd_internal.h"
38
39 /**
40  * helper function bumping lqe_pending_req if there is no quota request in
41  * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
42  */
43 static inline int qsd_request_enter(struct lquota_entry *lqe)
44 {
45         /* is there already a quota request in flight? */
46         if (lqe->lqe_pending_req != 0) {
47                 LQUOTA_DEBUG(lqe, "already a request in flight");
48                 return -EBUSY;
49         }
50
51         if (lqe->lqe_pending_rel != 0) {
52                 LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
53                              lqe->lqe_pending_rel);
54                 LBUG();
55         }
56
57         lqe->lqe_pending_req++;
58         return 0;
59 }
60
61 /**
62  * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
63  */
64 static inline void qsd_request_exit(struct lquota_entry *lqe)
65 {
66         if (lqe->lqe_pending_req != 1) {
67                 LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
68                 LBUG();
69         }
70         lqe->lqe_pending_req--;
71         lqe->lqe_pending_rel = 0;
72         cfs_waitq_broadcast(&lqe->lqe_waiters);
73 }
74
75 /**
76  * Check whether a qsd instance is all set to send quota request to master.
77  * This includes checking whether:
78  * - the connection to master is set up and usable,
79  * - the qsd isn't stopping
80  * - reintegration has been successfully completed and all indexes are
81  *   up-to-date
82  *
83  * \param lqe - is the lquota entry for which we would like to send an quota
84  *              request
85  * \param lockh - is the remote handle of the global lock returned on success
86  *
87  * \retval 0 on success, appropriate error on failure
88  */
89 static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
90 {
91         struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
92         struct qsd_instance     *qsd = qqi->qqi_qsd;
93         struct obd_import       *imp = NULL;
94         struct ldlm_lock        *lock;
95         ENTRY;
96
97         read_lock(&qsd->qsd_lock);
98         /* is the qsd about to shut down? */
99         if (qsd->qsd_stopping) {
100                 read_unlock(&qsd->qsd_lock);
101                 LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
102                 /* Target is about to shut down, client will retry */
103                 RETURN(-EINPROGRESS);
104         }
105
106         /* is the connection to the quota master ready? */
107         if (qsd->qsd_exp_valid)
108                 imp = class_exp2cliimp(qsd->qsd_exp);
109         if (imp == NULL || imp->imp_invalid) {
110                 read_unlock(&qsd->qsd_lock);
111                 LQUOTA_DEBUG(lqe, "connection to master not ready");
112                 RETURN(-ENOTCONN);
113         }
114
115         /* In most case, reintegration must have been triggered (when enable
116          * quota or on OST start), however, in rare race condition (enabling
117          * quota when starting OSTs), we might miss triggering reintegration
118          * for some qqi.
119          *
120          * If the previous reintegration failed for some reason, we'll
121          * re-trigger it here as well. */
122         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
123                 read_unlock(&qsd->qsd_lock);
124                 LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
125                              "kicking off reintegration");
126                 qsd_start_reint_thread(qqi);
127                 RETURN(-EINPROGRESS);
128         }
129
130         /* Fill the remote global lock handle, master will check this handle
131          * to see if the slave is sending request with stale lock */
132         lustre_handle_copy(lockh, &qqi->qqi_lockh);
133         read_unlock(&qsd->qsd_lock);
134
135         if (!lustre_handle_is_used(lockh))
136                 RETURN(-ENOLCK);
137
138         lock = ldlm_handle2lock(lockh);
139         if (lock == NULL)
140                 RETURN(-ENOLCK);
141
142         /* return remote lock handle to be packed in quota request */
143         lustre_handle_copy(lockh, &lock->l_remote_handle);
144         LDLM_LOCK_PUT(lock);
145
146         RETURN(0);
147 }
148
149 /**
150  * Check whether any quota space adjustment (pre-acquire/release/report) is
151  * needed for a given quota ID. If a non-null \a qbody is passed, then the
152  * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
153  * to be packed in the quota request.
154  *
155  * \param lqe   - is the lquota entry for which we would like to adjust quota
156  *                space.
157  * \param qbody - is the quota body to fill, if not NULL.
158  *
159  * \retval true  - space adjustment is required and \a qbody is filled, if not
160  *                 NULL
161  * \retval false - no space adjustment required
162  */
163 static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
164 {
165         __u64   usage, granted;
166         ENTRY;
167
168         usage   = lqe->lqe_usage;
169         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
170         granted = lqe->lqe_granted;
171
172         if (qbody != NULL)
173                 qbody->qb_flags = 0;
174
175         if (!lqe->lqe_enforced) {
176                 /* quota not enforced any more for this ID */
177                 if (granted != 0) {
178                         /* release all quota space unconditionally */
179                         LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
180                         if (qbody != NULL) {
181                                 qbody->qb_count = granted;
182                                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
183                         }
184                         RETURN(true);
185                 }
186                 RETURN(false);
187         }
188
189         if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
190                 /* No valid per-ID lock
191                  * When reporting quota (during reintegration or on setquota
192                  * glimpse), we should release granted space if usage is 0.
193                  * Otherwise, if the usage is less than granted, we need to
194                  * acquire the per-ID lock to make sure the unused grant can be
195                  * reclaimed by per-ID lock glimpse. */
196                 if (usage == 0) {
197                         /* no on-disk usage and no outstanding activity, release
198                          * space */
199                         if (granted != 0) {
200                                 LQUOTA_DEBUG(lqe, "no usage, releasing all "
201                                              "space");
202                                 if (qbody != NULL) {
203                                         qbody->qb_count = granted;
204                                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
205                                 }
206                                 RETURN(true);
207                         }
208                         LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
209                                      "do");
210                         RETURN(false);
211                 }
212
213                 if (lqe->lqe_usage < lqe->lqe_granted) {
214                         /* holding quota space w/o any lock, enqueue per-ID lock
215                          * again */
216                         LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
217                         if (qbody != NULL) {
218                                 qbody->qb_count = 0;
219                                 qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
220                         }
221                         RETURN(true);
222                 }
223
224                 if (lqe->lqe_usage > lqe->lqe_granted) {
225                         /* quota overrun, report usage */
226                         LQUOTA_DEBUG(lqe, "overrun, reporting usage");
227                         if (qbody != NULL) {
228                                 qbody->qb_usage = lqe->lqe_usage;
229                                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
230                         }
231                         RETURN(true);
232                 }
233                 LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
234                 RETURN(false);
235         }
236
237         /* valid per-ID lock
238          * Apply good old quota qunit adjustment logic which has been around
239          * since lustre 1.4:
240          * 1. release spare quota space? */
241         if (granted > usage + lqe->lqe_qunit) {
242                 /* pre-release quota space */
243                 if (qbody == NULL)
244                         RETURN(true);
245                 qbody->qb_count = granted - usage;
246                 /* if usage == 0, release all granted space */
247                 if (usage) {
248                         /* try to keep one qunit of quota space */
249                         qbody->qb_count -= lqe->lqe_qunit;
250                         /* but don't release less than qtune to avoid releasing
251                          * space too often */
252                         if (qbody->qb_count < lqe->lqe_qtune)
253                                 qbody->qb_count = lqe->lqe_qtune;
254                 }
255                 qbody->qb_flags = QUOTA_DQACQ_FL_REL;
256                 RETURN(true);
257         }
258
259         /* 2. Any quota overrun? */
260         if (lqe->lqe_usage > lqe->lqe_granted) {
261                 /* we overconsumed quota space, we report usage in request so
262                  * that master can adjust it unconditionally */
263                 if (qbody == NULL)
264                         RETURN(true);
265                 qbody->qb_usage = lqe->lqe_usage;
266                 granted         = lqe->lqe_usage;
267                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
268         }
269
270         /* 3. Time to pre-acquire? */
271         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
272             lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
273                 /* To pre-acquire quota space, we report how much spare quota
274                  * space the slave currently owns, then the master will grant us
275                  * back how much we can pretend given the current state of
276                  * affairs */
277                 if (qbody == NULL)
278                         RETURN(true);
279                 if (granted <= usage)
280                         qbody->qb_count = 0;
281                 else
282                         qbody->qb_count = granted - usage;
283                 qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
284                 RETURN(true);
285         }
286
287         if (qbody != NULL)
288                 RETURN(qbody->qb_flags != 0);
289         else
290                 RETURN(false);
291 }
292
293 /**
294  * Helper function returning true when quota space need to be adjusted (some
295  * unused space should be free or pre-acquire) and false otherwise.
296  */
297 static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
298 {
299         return qsd_calc_adjust(lqe, NULL);
300 }
301
302 /**
303  * Callback function called when an acquire/release request sent to the master
304  * is completed
305  */
306 static void qsd_req_completion(const struct lu_env *env,
307                                struct qsd_qtype_info *qqi,
308                                struct quota_body *reqbody,
309                                struct quota_body *repbody,
310                                struct lustre_handle *lockh,
311                                union ldlm_wire_lvb *lvb,
312                                void *arg, int ret)
313 {
314         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
315         struct qsd_thread_info  *qti;
316         int                      rc;
317         bool                     adjust = false, cancel = false;
318         ENTRY;
319
320         LASSERT(qqi != NULL && lqe != NULL);
321
322         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
323          * DT tags set. */
324         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
325         if (rc) {
326                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
327                 lqe_write_lock(lqe);
328                 /* can't afford to adjust quota space with no suitable lu_env */
329                 GOTO(out_noadjust, rc);
330         }
331         qti = qsd_info(env);
332
333         lqe_write_lock(lqe);
334         LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
335                      reqbody->qb_flags);
336
337         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
338          * grant us back quota space to adjust quota overrun */
339         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
340                 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
341                    ret != -ESHUTDOWN && ret != -EAGAIN)
342                         /* print errors only if return code is unexpected */
343                         LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
344                                      ret, reqbody->qb_flags);
345                 GOTO(out, ret);
346         }
347
348         /* Set the lqe_lockh */
349         if (lustre_handle_is_used(lockh) &&
350             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
351                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
352
353         /* If the replied qb_count is zero, it means master didn't process
354          * the DQACQ since the limit for this ID has been removed, so we
355          * should not update quota entry & slave index copy neither. */
356         if (repbody != NULL && repbody->qb_count != 0) {
357                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
358
359                 if (req_is_rel(reqbody->qb_flags)) {
360                         if (lqe->lqe_granted < repbody->qb_count) {
361                                 LQUOTA_ERROR(lqe, "can't release more space "
362                                              "than owned "LPU64"<"LPU64,
363                                              lqe->lqe_granted,
364                                              repbody->qb_count);
365                                 lqe->lqe_granted = 0;
366                         } else {
367                                 lqe->lqe_granted -= repbody->qb_count;
368                         }
369                         /* Cancel the per-ID lock initiatively when there
370                          * isn't any usage & grant, which can avoid master
371                          * sending glimpse unnecessarily to this slave on
372                          * quota revoking */
373                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
374                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
375                                 cancel = true;
376                 } else {
377                         lqe->lqe_granted += repbody->qb_count;
378                 }
379                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
380                 lqe_write_unlock(lqe);
381
382                 /* Update the slave index file in the dedicated thread. So far,
383                  * We don't update the version of slave index copy on DQACQ.
384                  * No locking is necessary since nobody can change
385                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
386                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
387                                  false);
388                 lqe_write_lock(lqe);
389         }
390
391         /* extract information from lvb */
392         if (ret == 0 && lvb != 0) {
393                 if (lvb->l_lquota.lvb_id_qunit != 0)
394                         qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
395                 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
396                         lqe->lqe_edquot = true;
397                 else
398                         lqe->lqe_edquot = false;
399         } else if (repbody != NULL && repbody->qb_qunit != 0) {
400                 qsd_set_qunit(lqe, repbody->qb_qunit);
401         }
402
403         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
404          * flooding the master with acquire request. Pre-acquire will be turned
405          * on again as soon as qunit is modified */
406         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
407                 lqe->lqe_nopreacq = true;
408 out:
409         adjust = qsd_adjust_needed(lqe);
410         if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
411                 lqe->lqe_acq_rc = ret;
412                 lqe->lqe_acq_time = cfs_time_current_64();
413         }
414 out_noadjust:
415         qsd_request_exit(lqe);
416         lqe_write_unlock(lqe);
417
418         /* release reference on per-ID lock */
419         if (lustre_handle_is_used(lockh))
420                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
421
422         if (cancel) {
423                 qsd_adjust_schedule(lqe, false, true);
424         } else if (adjust) {
425                 if (!ret || ret == -EDQUOT)
426                         qsd_adjust_schedule(lqe, false, false);
427                 else
428                         qsd_adjust_schedule(lqe, true, false);
429         }
430         lqe_putref(lqe);
431
432         if (lvb)
433                 OBD_FREE_PTR(lvb);
434         EXIT;
435 }
436
437 /**
438  * Try to consume local quota space.
439  *
440  * \param lqe   - is the qid entry to be processed
441  * \param space - is the amount of quota space needed to complete the operation
442  *
443  * \retval 0       - success
444  * \retval -EDQUOT - out of quota
445  * \retval -EAGAIN - need to acquire space from master
446  */
447 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
448 {
449         __u64   usage;
450         int     rc;
451         ENTRY;
452
453         if (!lqe->lqe_enforced)
454                 /* not enforced any more, we are good */
455                 RETURN(-ESRCH);
456
457         lqe_write_lock(lqe);
458         /* use latest usage */
459         usage = lqe->lqe_usage;
460         /* take pending write into account */
461         usage += lqe->lqe_pending_write;
462
463         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
464                 /* Yay! we got enough space */
465                 lqe->lqe_pending_write += space;
466                 lqe->lqe_waiting_write -= space;
467                 rc = 0;
468         } else if (lqe->lqe_edquot) {
469                 rc = -EDQUOT;
470         } else {
471                 rc = -EAGAIN;
472         }
473         lqe_write_unlock(lqe);
474
475         RETURN(rc);
476 }
477
478 /**
479  * Compute how much quota space should be acquire from the master based
480  * on how much is currently granted to this slave and pending/waiting
481  * operations.
482  *
483  * \param lqe - is the lquota entry for which we would like to adjust quota
484  *              space.
485  * \param qbody - is the quota body of the acquire request to fill
486  *
487  * \retval true  - space acquisition is needed and qbody is filled
488  * \retval false - no space acquisition required
489  */
490 static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
491                                     struct quota_body *qbody)
492 {
493         __u64   usage, granted;
494
495         usage   = lqe->lqe_usage;
496         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
497         granted = lqe->lqe_granted;
498
499         qbody->qb_flags = 0;
500
501         /* if we overconsumed quota space, we report usage in request so that
502          * master can adjust it unconditionally */
503         if (lqe->lqe_usage > lqe->lqe_granted) {
504                 qbody->qb_usage = lqe->lqe_usage;
505                 qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
506                 granted = lqe->lqe_usage;
507         }
508
509         /* acquire as much as needed, but not more */
510         if (usage > granted) {
511                 qbody->qb_count  = usage - granted;
512                 qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
513         }
514
515         return qbody->qb_flags != 0;
516 }
517
518 /**
519  * Acquire quota space from master.
520  * There are at most 1 in-flight dqacq/dqrel.
521  *
522  * \param env    - the environment passed by the caller
523  * \param lqe    - is the qid entry to be processed
524  *
525  * \retval 0            - success
526  * \retval -EDQUOT      - out of quota
527  * \retval -EINPROGRESS - inform client to retry write/create
528  * \retval -EBUSY       - already a quota request in flight
529  * \retval -ve          - other appropriate errors
530  */
531 static int qsd_acquire_remote(const struct lu_env *env,
532                               struct lquota_entry *lqe)
533 {
534         struct qsd_thread_info  *qti = qsd_info(env);
535         struct quota_body       *qbody = &qti->qti_body;
536         struct qsd_instance     *qsd;
537         struct qsd_qtype_info   *qqi;
538         int                      rc;
539         ENTRY;
540
541         memset(qbody, 0, sizeof(*qbody));
542         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
543         if (rc)
544                 RETURN(rc);
545
546         qqi = lqe2qqi(lqe);
547         qsd = qqi->qqi_qsd;
548
549         lqe_write_lock(lqe);
550
551         /* is quota really enforced for this id? */
552         if (!lqe->lqe_enforced) {
553                 lqe_write_unlock(lqe);
554                 LQUOTA_DEBUG(lqe, "quota not enforced any more");
555                 RETURN(0);
556         }
557
558         /* fill qb_count & qb_flags */
559         if (!qsd_calc_acquire(lqe, qbody)) {
560                 lqe_write_unlock(lqe);
561                 LQUOTA_DEBUG(lqe, "No acquire required");
562                 RETURN(0);
563         }
564
565         /* check whether an acquire request completed recently */
566         if (lqe->lqe_acq_rc != 0 &&
567             cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
568                 lqe_write_unlock(lqe);
569                 LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
570                 RETURN(lqe->lqe_acq_rc);
571         }
572
573         /* only 1 quota request in flight for a given ID is allowed */
574         rc = qsd_request_enter(lqe);
575         if (rc) {
576                 lqe_write_unlock(lqe);
577                 RETURN(rc);
578         }
579
580         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
581         lqe_write_unlock(lqe);
582
583         /* hold a refcount until completion */
584         lqe_getref(lqe);
585
586         /* fill other quota body fields */
587         qbody->qb_fid = qqi->qqi_fid;
588         qbody->qb_id  = lqe->lqe_id;
589
590         /* check whether we already own a valid lock for this ID */
591         rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
592         if (rc) {
593                 union ldlm_wire_lvb *lvb;
594
595                 OBD_ALLOC_PTR(lvb);
596                 if (lvb == NULL) {
597                         rc = -ENOMEM;
598                         qsd_req_completion(env, qqi, qbody, NULL,
599                                            &qti->qti_lockh, NULL, lqe, rc);
600                         RETURN(rc);
601                 }
602                 /* no lock found, should use intent */
603                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
604                                      IT_QUOTA_DQACQ, qsd_req_completion,
605                                      qqi, lvb, (void *)lqe);
606         } else {
607                 /* lock found, should use regular dqacq */
608                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
609                                     qsd_req_completion, qqi, &qti->qti_lockh,
610                                     lqe);
611         }
612
613         /* the completion function will be called by qsd_send_dqacq or
614          * qsd_intent_lock */
615         RETURN(rc);
616 }
617
618 /**
619  * Acquire \a space of quota space in order to complete an operation.
620  * Try to consume local quota space first and send acquire request to quota
621  * master if required.
622  *
623  * \param env   - the environment passed by the caller
624  * \param lqe   - is the qid entry to be processed
625  * \param space - is the amount of quota required for the operation
626  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
627  *
628  * \retval true  - exit from l_wait_event and real return value in \a ret
629  * \retval false - continue waiting
630  */
631 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
632                         long long space, int *ret)
633 {
634         int rc = 0, count;
635         ENTRY;
636
637         for (count = 0; rc == 0; count++) {
638                 LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
639
640                 if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
641                         rc = -EINPROGRESS;
642                         break;
643                 }
644
645                 /* refresh disk usage */
646                 rc = qsd_refresh_usage(env, lqe);
647                 if (rc)
648                         break;
649
650                 /* try to consume local quota space first */
651                 rc = qsd_acquire_local(lqe, space);
652                 if (rc != -EAGAIN)
653                         /* rc == 0, Wouhou! enough local quota space
654                          * rc < 0, something bad happened */
655                          break;
656
657                 /* need to acquire more quota space from master */
658                 rc = qsd_acquire_remote(env, lqe);
659         }
660
661         if (rc == -EBUSY)
662                 /* already a request in flight, continue waiting */
663                 RETURN(false);
664         *ret = rc;
665         RETURN(true); /* exit from l_wait_event */
666 }
667
668 /**
669  * Quota enforcement handler. If local quota can satisfy this operation,
670  * return success, otherwise, acquire more quota from master.
671  * (for write operation, if master isn't available at this moment, return
672  * -EINPROGRESS to inform client to retry the write)
673  *
674  * \param env   - the environment passed by the caller
675  * \param qsd   - is the qsd instance associated with the device in charge
676  *                of the operation.
677  * \param qid   - is the qid information attached in the transaction handle
678  * \param space - is the space required by the operation
679  * \param flags - if the operation is write, return caller no user/group
680  *                and sync commit flags
681  *
682  * \retval 0            - success
683  * \retval -EDQUOT      - out of quota
684  * \retval -EINPROGRESS - inform client to retry write
685  * \retval -ve          - other appropriate errors
686  */
687 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
688                          struct lquota_id_info *qid, long long space,
689                          int *flags)
690 {
691         struct lquota_entry     *lqe;
692         int                      rc, ret = -EINPROGRESS;
693         struct l_wait_info       lwi;
694         ENTRY;
695
696         if (qid->lqi_qentry != NULL) {
697                 /* we already had to deal with this id for this transaction */
698                 lqe = qid->lqi_qentry;
699                 if (!lqe->lqe_enforced)
700                         RETURN(0);
701         } else {
702                 /* look up lquota entry associated with qid */
703                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
704                 if (IS_ERR(lqe))
705                         RETURN(PTR_ERR(lqe));
706                 if (!lqe->lqe_enforced) {
707                         lqe_putref(lqe);
708                         RETURN(0);
709                 }
710                 qid->lqi_qentry = lqe;
711                 /* lqe will be released in qsd_op_end() */
712         }
713
714         if (space <= 0) {
715                 /* when space is negative or null, we don't need to consume
716                  * quota space. That said, we still want to perform space
717                  * adjustments in qsd_op_end, so we return here, but with
718                  * a reference on the lqe */
719                 if (flags != NULL) {
720                         rc = qsd_refresh_usage(env, lqe);
721                         GOTO(out_flags, rc);
722                 }
723                 RETURN(0);
724         }
725
726         LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
727
728         lqe_write_lock(lqe);
729         lqe->lqe_waiting_write += space;
730         lqe_write_unlock(lqe);
731
732         /* acquire quota space for the operation, cap overall wait time to
733          * prevent a service thread from being stuck for too long */
734         lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
735                           NULL, NULL);
736         rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
737                           &lwi);
738
739         if (rc == 0 && ret == 0) {
740                 qid->lqi_space += space;
741         } else {
742                 if (rc == 0)
743                         rc = ret;
744
745                 LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
746
747                 lqe_write_lock(lqe);
748                 lqe->lqe_waiting_write -= space;
749
750                 if (flags && lqe->lqe_pending_write != 0)
751                         /* Inform OSD layer that there are pending writes.
752                          * It might want to retry after a sync if appropriate */
753                          *flags |= QUOTA_FL_SYNC;
754                 lqe_write_unlock(lqe);
755
756                 /* convert recoverable error into -EINPROGRESS, client will
757                  * retry */
758                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
759                     rc == -EAGAIN || rc == -EINTR) {
760                         rc = -EINPROGRESS;
761                 } else if (rc == -ESRCH) {
762                         rc = 0;
763                         LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
764                                      "probably due to a legeal race, if this "
765                                      "message is showing up constantly, there "
766                                      "could be some inconsistence between "
767                                      "master & slave, and quota reintegration "
768                                      "needs be re-triggered.");
769                 }
770         }
771
772         if (flags != NULL) {
773 out_flags:
774                 LASSERT(qid->lqi_is_blk);
775                 if (rc != 0) {
776                         *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
777                 } else {
778                         __u64   usage;
779
780                         lqe_read_lock(lqe);
781                         usage  = lqe->lqe_usage;
782                         usage += lqe->lqe_pending_write;
783                         usage += lqe->lqe_waiting_write;
784                         usage += qqi->qqi_qsd->qsd_sync_threshold;
785
786                         /* if we should notify client to start sync write */
787                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
788                                 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
789                         else
790                                 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
791                         lqe_read_unlock(lqe);
792                 }
793         }
794         RETURN(rc);
795 }
796
797 /**
798  * helper function comparing two lquota_id_info structures
799  */
800 static inline bool qid_equal(struct lquota_id_info *q1,
801                              struct lquota_id_info *q2)
802 {
803         if (q1->lqi_type != q2->lqi_type)
804                 return false;
805         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
806 }
807
808 /**
809  * Enforce quota, it's called in the declaration of each operation.
810  * qsd_op_end() will then be called later once all the operations have been
811  * completed in order to release/adjust the quota space.
812  *
813  * \param env   - the environment passed by the caller
814  * \param qsd   - is the qsd instance associated with the device in charge of
815  *                the operation.
816  * \param trans - is the quota transaction information
817  * \param qi    - qid & space required by current operation
818  * \param flags - if the operation is write, return caller no user/group and
819  *                sync commit flags
820  *
821  * \retval 0            - success
822  * \retval -EDQUOT      - out of quota
823  * \retval -EINPROGRESS - inform client to retry write
824  * \retval -ve          - other appropriate errors
825  */
826 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
827                  struct lquota_trans *trans, struct lquota_id_info *qi,
828                  int *flags)
829 {
830         int     i, rc;
831         bool    found = false;
832         ENTRY;
833
834         if (unlikely(qsd == NULL))
835                 RETURN(0);
836
837         /* We don't enforce quota until the qsd_instance is started */
838         read_lock(&qsd->qsd_lock);
839         if (!qsd->qsd_started) {
840                 read_unlock(&qsd->qsd_lock);
841                 RETURN(0);
842         }
843         read_unlock(&qsd->qsd_lock);
844
845         /* ignore block quota on MDTs, ignore inode quota on OSTs */
846         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
847             (qsd->qsd_is_md && qi->lqi_is_blk))
848                 RETURN(0);
849
850         /* ignore quota enforcement request when:
851          *    - quota isn't enforced for this quota type
852          * or - the user/group is root */
853         if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
854                 RETURN(0);
855
856         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
857                  trans->lqt_id_cnt);
858         /* check whether we already allocated a slot for this id */
859         for (i = 0; i < trans->lqt_id_cnt; i++) {
860                 if (qid_equal(qi, &trans->lqt_ids[i])) {
861                         found = true;
862                         /* make sure we are not mixing inodes & blocks */
863                         LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
864                         break;
865                 }
866         }
867
868         if (!found) {
869                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
870                         CERROR("%s: more than %d qids enforced for a "
871                                "transaction?\n", qsd->qsd_svname, i);
872                         RETURN(-EINVAL);
873                 }
874
875                 /* fill new slot */
876                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
877                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
878                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
879                 trans->lqt_id_cnt++;
880         }
881
882         /* manage quota enforcement for this ID */
883         rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
884                            &trans->lqt_ids[i], qi->lqi_space, flags);
885         RETURN(rc);
886 }
887 EXPORT_SYMBOL(qsd_op_begin);
888
889 /**
890  * Adjust quota space (by acquiring or releasing) hold by the quota slave.
891  * This function is called after each quota request completion and during
892  * reintegration in order to report usage or re-acquire quota locks.
893  * Space adjustment is aborted if there is already a quota request in flight
894  * for this ID.
895  *
896  * \param env    - the environment passed by the caller
897  * \param lqe    - is the qid entry to be processed
898  *
899  * \retval 0 on success, appropriate errors on failure
900  */
901 int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
902 {
903         struct qsd_thread_info  *qti = qsd_info(env);
904         struct quota_body       *qbody = &qti->qti_body;
905         struct qsd_instance     *qsd;
906         struct qsd_qtype_info   *qqi;
907         int                      rc;
908         bool                     intent = false;
909         ENTRY;
910
911         memset(qbody, 0, sizeof(*qbody));
912         rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
913         if (rc) {
914                 /* add to adjust list again to trigger adjustment later when
915                  * slave is ready */
916                 LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
917                 qsd_adjust_schedule(lqe, true, false);
918                 RETURN(0);
919         }
920
921         qqi = lqe2qqi(lqe);
922         qsd = qqi->qqi_qsd;
923
924         lqe_write_lock(lqe);
925
926         /* fill qb_count & qb_flags */
927         if (!qsd_calc_adjust(lqe, qbody)) {
928                 lqe_write_unlock(lqe);
929                 LQUOTA_DEBUG(lqe, "no adjustment required");
930                 RETURN(0);
931         }
932
933         /* only 1 quota request in flight for a given ID is allowed */
934         rc = qsd_request_enter(lqe);
935         if (rc) {
936                 /* already a request in flight, space adjustment will be run
937                  * again on request completion */
938                 lqe_write_unlock(lqe);
939                 RETURN(0);
940         }
941
942         if (req_is_rel(qbody->qb_flags))
943                 lqe->lqe_pending_rel = qbody->qb_count;
944         lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
945         lqe_write_unlock(lqe);
946
947         /* hold a refcount until completion */
948         lqe_getref(lqe);
949
950         /* fill other quota body fields */
951         qbody->qb_fid = qqi->qqi_fid;
952         qbody->qb_id  = lqe->lqe_id;
953
954         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
955                 /* check whether we own a valid lock for this ID */
956                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
957                 if (rc) {
958                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
959                         if (req_is_preacq(qbody->qb_flags)) {
960                                 if (req_has_rep(qbody->qb_flags))
961                                         /* still want to report usage */
962                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
963                                 else
964                                         /* no pre-acquire if no per-ID lock */
965                                         GOTO(out, rc = -ENOLCK);
966                         } else {
967                                 /* no lock found, should use intent */
968                                 intent = true;
969                         }
970                 } else if (req_is_acq(qbody->qb_flags) &&
971                            qbody->qb_count == 0) {
972                         /* found cached lock, no need to acquire */
973                         GOTO(out, rc = 0);
974                 }
975         } else {
976                 /* release and report don't need a per-ID lock */
977                 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
978         }
979
980         if (!intent) {
981                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
982                                     qsd_req_completion, qqi, &qti->qti_lockh,
983                                     lqe);
984         } else {
985                 union ldlm_wire_lvb *lvb;
986
987                 OBD_ALLOC_PTR(lvb);
988                 if (lvb == NULL)
989                         GOTO(out, rc = -ENOMEM);
990
991                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
992                                      IT_QUOTA_DQACQ, qsd_req_completion,
993                                      qqi, lvb, (void *)lqe);
994         }
995         /* the completion function will be called by qsd_send_dqacq or
996          * qsd_intent_lock */
997         RETURN(rc);
998 out:
999         qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
1000                            rc);
1001         return rc;
1002 }
1003
1004 /**
1005  * Post quota operation, pre-acquire/release quota from master.
1006  *
1007  * \param  env  - the environment passed by the caller
1008  * \param  qsd  - is the qsd instance attached to the OSD device which
1009  *                is handling the operation.
1010  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
1011  *                subject to the operation
1012  * \param  qid  - stores information related to his ID for the operation
1013  *                which has just completed
1014  *
1015  * \retval 0    - success
1016  * \retval -ve  - failure
1017  */
1018 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
1019                         struct lquota_id_info *qid)
1020 {
1021         struct lquota_entry     *lqe;
1022         bool                     adjust;
1023         ENTRY;
1024
1025         lqe = qid->lqi_qentry;
1026         if (lqe == NULL)
1027                 RETURN_EXIT;
1028         qid->lqi_qentry = NULL;
1029
1030         /* refresh cached usage if a suitable environment is passed */
1031         if (env != NULL)
1032                 qsd_refresh_usage(env, lqe);
1033
1034         lqe_write_lock(lqe);
1035         if (qid->lqi_space > 0)
1036                 lqe->lqe_pending_write -= qid->lqi_space;
1037         if (env != NULL)
1038                 adjust = qsd_adjust_needed(lqe);
1039         else
1040                 adjust = true;
1041         lqe_write_unlock(lqe);
1042
1043         if (adjust) {
1044                 /* pre-acquire/release quota space is needed */
1045                 if (env != NULL)
1046                         qsd_adjust(env, lqe);
1047                 else
1048                         /* no suitable environment, handle adjustment in
1049                          * separate thread context */
1050                         qsd_adjust_schedule(lqe, false, false);
1051         }
1052         lqe_putref(lqe);
1053         EXIT;
1054 }
1055
1056 /**
1057  * Post quota operation. It's called after each operation transaction stopped.
1058  *
1059  * \param  env   - the environment passed by the caller
1060  * \param  qsd   - is the qsd instance associated with device which is handling
1061  *                 the operation.
1062  * \param  qids  - all qids information attached in the transaction handle
1063  * \param  count - is the number of qid entries in the qids array.
1064  *
1065  * \retval 0     - success
1066  * \retval -ve   - failure
1067  */
1068 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
1069                 struct lquota_trans *trans)
1070 {
1071         int i;
1072         ENTRY;
1073
1074         if (unlikely(qsd == NULL))
1075                 RETURN_EXIT;
1076
1077         /* We don't enforce quota until the qsd_instance is started */
1078         read_lock(&qsd->qsd_lock);
1079         if (!qsd->qsd_started) {
1080                 read_unlock(&qsd->qsd_lock);
1081                 RETURN_EXIT;
1082         }
1083         read_unlock(&qsd->qsd_lock);
1084
1085         LASSERT(trans != NULL);
1086
1087         for (i = 0; i < trans->lqt_id_cnt; i++) {
1088                 struct qsd_qtype_info *qqi;
1089
1090                 if (trans->lqt_ids[i].lqi_qentry == NULL)
1091                         continue;
1092
1093                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
1094                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
1095         }
1096
1097         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
1098          * does not result in failure */
1099         trans->lqt_id_cnt = 0;
1100         EXIT;
1101 }
1102 EXPORT_SYMBOL(qsd_op_end);
1103
1104 /**
1105  * Trigger pre-acquire/release if necessary.
1106  * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
1107  * quota accounting isn't updated when the transaction stopped. Instead, it'll
1108  * be updated on the final iput, so qsd_op_adjust() will be called then (in
1109  * osd_object_delete()) to trigger quota release if necessary.
1110  *
1111  * \param env - the environment passed by the caller
1112  * \param qsd - is the qsd instance associated with the device in charge
1113  *              of the operation.
1114  * \param qid - is the lquota ID of the user/group for which to trigger
1115  *              quota space adjustment
1116  * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
1117  */
1118 void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
1119                    union lquota_id *qid, int qtype)
1120 {
1121         struct lquota_entry    *lqe;
1122         struct qsd_qtype_info  *qqi;
1123         bool                    adjust;
1124         ENTRY;
1125
1126         if (unlikely(qsd == NULL))
1127                 RETURN_EXIT;
1128
1129         /* We don't enforce quota until the qsd_instance is started */
1130         read_lock(&qsd->qsd_lock);
1131         if (!qsd->qsd_started) {
1132                 read_unlock(&qsd->qsd_lock);
1133                 RETURN_EXIT;
1134         }
1135         read_unlock(&qsd->qsd_lock);
1136
1137         qqi = qsd->qsd_type_array[qtype];
1138         LASSERT(qqi);
1139
1140         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
1141             qid->qid_uid == 0)
1142                 RETURN_EXIT;
1143
1144         read_lock(&qsd->qsd_lock);
1145         if (!qsd->qsd_started) {
1146                 read_unlock(&qsd->qsd_lock);
1147                 RETURN_EXIT;
1148         }
1149         read_unlock(&qsd->qsd_lock);
1150
1151         lqe = lqe_locate(env, qqi->qqi_site, qid);
1152         if (IS_ERR(lqe)) {
1153                 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
1154                        qsd->qsd_svname, qid->qid_uid, qtype);
1155                 RETURN_EXIT;
1156         }
1157
1158         qsd_refresh_usage(env, lqe);
1159
1160         lqe_read_lock(lqe);
1161         adjust = qsd_adjust_needed(lqe);
1162         lqe_read_unlock(lqe);
1163
1164         if (adjust)
1165                 qsd_adjust(env, lqe);
1166
1167         lqe_putref(lqe);
1168         EXIT;
1169 }
1170 EXPORT_SYMBOL(qsd_op_adjust);