Whamcloud - gitweb
b1ec4a09998370d492847e3ca905f7d0382dab00
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <linux/version.h>
38 #include <linux/fs.h>
39 #include <asm/unistd.h>
40 #include <linux/quotaops.h>
41 #include <linux/init.h>
42
43 #include <obd_class.h>
44 #include <lustre_param.h>
45 #include <lprocfs_status.h>
46
47 #include "qsd_internal.h"
48
49 /*
50  * helper function returning how much space is currently reserved for requests
51  * in flight.
52  */
53 static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
54 {
55         int     pending;
56
57         lqe_read_lock(lqe);
58         pending = lqe->lqe_pending_req;
59         lqe_read_unlock(lqe);
60
61         return pending;
62 }
63
64 /*
65  * helper function returning true when the connection to master is ready to be
66  * used.
67  */
68 static inline int qsd_ready(struct qsd_instance *qsd)
69 {
70         struct obd_import       *imp = NULL;
71
72         cfs_read_lock(&qsd->qsd_lock);
73         if (qsd->qsd_exp_valid)
74                 imp = class_exp2cliimp(qsd->qsd_exp);
75         cfs_read_unlock(&qsd->qsd_lock);
76
77         return (imp == NULL || imp->imp_invalid) ? false : true;
78 }
79
80 /*
81  * Helper function returning true when quota space need to be adjusted (some
82  * unused space should be free or pre-acquire) and false otherwise.
83  */
84 static bool qsd_adjust_needed(struct lquota_entry *lqe)
85 {
86         struct qsd_qtype_info   *qqi;
87         __u64                    usage, granted;
88
89         qqi = lqe2qqi(lqe);
90
91         if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
92                 /* if quota isn't enforced for this id, no need to adjust
93                  * Similarly, no need to perform adjustment if the target is in
94                  * the process of shutting down. */
95                 return false;
96
97         usage  = lqe->lqe_usage;
98         usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
99         granted = lqe->lqe_granted - lqe->lqe_pending_rel;
100
101         /* need to re-acquire per-ID lock or release all grant */
102         if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
103             lqe->lqe_granted > lqe->lqe_usage)
104                 return true;
105
106         /* good old quota qunit adjustment logic which has been around since
107          * lustre 1.4:
108          * 1. Need to release some space? */
109         if (granted > usage + lqe->lqe_qunit)
110                 return true;
111
112         /* 2. Any quota overrun? */
113         if (lqe->lqe_usage > lqe->lqe_granted)
114                 /* we ended up consuming more than we own, we need to have this
115                  * fixed ASAP */
116                 return true;
117
118         /* 3. Time to pre-acquire? */
119         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
120             granted < usage + lqe->lqe_qtune)
121                 /* need to pre-acquire some space if we don't want to block
122                  * client's requests */
123                 return true;
124
125         return false;
126 }
127
128 /*
129  * Callback function called when an acquire/release request sent to the master
130  * is completed
131  */
132 static void qsd_dqacq_completion(const struct lu_env *env,
133                                  struct qsd_qtype_info *qqi,
134                                  struct quota_body *reqbody,
135                                  struct quota_body *repbody,
136                                  struct lustre_handle *lockh,
137                                  union ldlm_wire_lvb *lvb,
138                                  void *arg, int ret)
139 {
140         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
141         struct qsd_thread_info  *qti;
142         int                      rc;
143         bool                     adjust = false, cancel = false;
144         ENTRY;
145
146         LASSERT(qqi != NULL && lqe != NULL);
147
148         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
149          * DT tags set. */
150         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
151         if (rc) {
152                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
153                 lqe_write_lock(lqe);
154                 /* can't afford to adjust quota space with no suitable lu_env */
155                 GOTO(out_noadjust, rc);
156         }
157         qti = qsd_info(env);
158
159         lqe_write_lock(lqe);
160
161         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
162                 LQUOTA_ERROR(lqe, "DQACQ failed with %d, op:%x", ret,
163                              reqbody->qb_flags);
164                 GOTO(out, ret);
165         }
166
167         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
168          * grant us back quota space to adjust quota overrun */
169
170         LQUOTA_DEBUG(lqe, "DQACQ returned %d", ret);
171
172         /* Set the lqe_lockh */
173         if (lustre_handle_is_used(lockh) &&
174             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
175                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
176
177         /* If the replied qb_count is zero, it means master didn't process
178          * the DQACQ since the limit for this ID has been removed, so we
179          * should not update quota entry & slave index copy neither. */
180         if (repbody != NULL && repbody->qb_count != 0) {
181                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
182
183                 if (req_is_rel(reqbody->qb_flags)) {
184                         if (lqe->lqe_granted < repbody->qb_count) {
185                                 LQUOTA_ERROR(lqe, "can't release more space "
186                                              "than owned "LPU64"<"LPU64,
187                                              lqe->lqe_granted,
188                                              repbody->qb_count);
189                                 lqe->lqe_granted = 0;
190                         } else {
191                                 lqe->lqe_granted -= repbody->qb_count;
192                         }
193                         /* Cancel the per-ID lock initiatively when there
194                          * isn't any usage & grant, which can avoid master
195                          * sending glimpse unnecessarily to this slave on
196                          * quota revoking */
197                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
198                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
199                                 cancel = true;
200                 } else {
201                         lqe->lqe_granted += repbody->qb_count;
202                 }
203                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
204                 lqe_write_unlock(lqe);
205
206                 /* Update the slave index file in the dedicated thread. So far,
207                  * We don't update the version of slave index copy on DQACQ.
208                  * No locking is necessary since nobody can change
209                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
210                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
211                                  false);
212                 lqe_write_lock(lqe);
213         }
214
215         /* extract information from lvb */
216         if (ret == 0 && lvb != 0) {
217                 if (lvb->l_lquota.lvb_id_qunit != 0)
218                         qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
219                 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
220                         lqe->lqe_edquot = true;
221                 else
222                         lqe->lqe_edquot = false;
223         } else if (repbody != NULL && repbody->qb_qunit != 0) {
224                 qsd_set_qunit(lqe, repbody->qb_qunit);
225         }
226
227         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
228          * flooding the master with acquire request. Pre-acquire will be turned
229          * on again as soon as qunit is modified */
230         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
231                 lqe->lqe_nopreacq = true;
232 out:
233         adjust = qsd_adjust_needed(lqe);
234 out_noadjust:
235         lqe->lqe_pending_req--;
236         lqe->lqe_pending_rel = 0;
237         lqe_write_unlock(lqe);
238
239         cfs_waitq_broadcast(&lqe->lqe_waiters);
240
241         /* release reference on per-ID lock */
242         if (lustre_handle_is_used(lockh))
243                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
244
245         if (cancel) {
246                 qsd_adjust_schedule(lqe, false, true);
247         } else if (adjust) {
248                 if (!ret || ret == -EDQUOT)
249                         qsd_adjust_schedule(lqe, false, false);
250                 else
251                         qsd_adjust_schedule(lqe, true, false);
252         }
253
254         if (lvb)
255                 /* free lvb allocated in qsd_dqacq */
256                 OBD_FREE_PTR(lvb);
257
258         lqe_putref(lqe);
259         EXIT;
260 }
261
262 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
263 {
264         __u64   usage;
265         int     rc;
266         ENTRY;
267
268         if (!lqe->lqe_enforced)
269                 /* not enforced any more, we are good */
270                 RETURN(0);
271
272         lqe_write_lock(lqe);
273         /* use latest usage */
274         usage = lqe->lqe_usage;
275         /* take pending write into account */
276         usage += lqe->lqe_pending_write;
277
278         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
279                 /* Yay! we got enough space */
280                 lqe->lqe_pending_write += space;
281                 lqe->lqe_waiting_write -= space;
282                 rc = 0;
283         } else if (lqe->lqe_edquot) {
284                 rc = -EDQUOT;
285         } else {
286                 rc = -EAGAIN;
287         }
288         lqe_write_unlock(lqe);
289
290         RETURN(rc);
291 }
292
293 static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
294                            struct quota_body *qbody)
295 {
296         struct qsd_qtype_info   *qqi;
297         __u64                    usage, granted;
298
299         if (!lqe->lqe_enforced && op != QSD_REL)
300                 return 0;
301
302         qqi = lqe2qqi(lqe);
303
304         LASSERT(lqe->lqe_pending_rel == 0);
305         usage   = lqe->lqe_usage;
306         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
307         granted = lqe->lqe_granted;
308
309         qbody->qb_flags = 0;
310 again:
311         switch (op) {
312         case QSD_ACQ:
313                 /* if we overconsumed quota space, we report usage in request
314                  * so that master can adjust it unconditionally */
315                 if (lqe->lqe_usage > lqe->lqe_granted) {
316                         qbody->qb_usage = lqe->lqe_usage;
317                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
318                         granted = lqe->lqe_usage;
319                 }
320                 /* acquire as much as needed, but not more */
321                 if (usage > granted) {
322                         qbody->qb_count  = usage - granted;
323                         qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
324                 }
325                 break;
326         case QSD_REP:
327                 /* When reporting quota (during reintegration or on setquota
328                  * glimpse), we should release granted space if usage is 0.
329                  * Otherwise, if the usage is less than granted, we need to
330                  * acquire the per-ID lock to make sure the unused grant can be
331                  * reclaimed by per-ID lock glimpse. */
332                 if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
333                         LQUOTA_DEBUG(lqe, "Release on report!");
334                         GOTO(again, op = QSD_REL);
335                 } else if (lqe->lqe_usage == lqe->lqe_granted) {
336                         LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
337                                      "anything on report!");
338                 } else if (lqe->lqe_usage < lqe->lqe_granted) {
339                         LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
340                         qbody->qb_count = 0;
341                         qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
342                 } else {
343                         LASSERT(lqe->lqe_usage > lqe->lqe_granted);
344                         LQUOTA_DEBUG(lqe, "Reporting usage");
345                         qbody->qb_usage = lqe->lqe_usage;
346                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
347                 }
348                 break;
349         case QSD_REL:
350                 /* release unused quota space unconditionally */
351                 if (lqe->lqe_granted > lqe->lqe_usage) {
352                         qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
353                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
354                 }
355                 break;
356         case QSD_ADJ: {
357                 /* need to re-acquire per-ID lock or release all grant */
358                 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
359                     lqe->lqe_granted > lqe->lqe_usage)
360                         GOTO(again, op = QSD_REP);
361
362                 /* release spare grant */
363                 if (granted > usage + lqe->lqe_qunit) {
364                         /* pre-release quota space */
365                         qbody->qb_count  = granted - usage;
366                         /* if usage == 0, release all granted space */
367                         if (usage) {
368                                 /* try to keep one qunit of quota space */
369                                 qbody->qb_count -= lqe->lqe_qunit;
370                                 /* but don't release less than qtune to avoid
371                                  * releasing space too often */
372                                 if (qbody->qb_count < lqe->lqe_qtune)
373                                         qbody->qb_count = lqe->lqe_qtune;
374                         }
375                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
376                         break;
377                 }
378
379                 /* if we overconsumed quota space, we report usage in request
380                  * so that master can adjust it unconditionally */
381                 if (lqe->lqe_usage > lqe->lqe_granted) {
382                         qbody->qb_usage = lqe->lqe_usage;
383                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
384                         granted         = lqe->lqe_usage;
385                 }
386
387                 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
388                     lustre_handle_is_used(&lqe->lqe_lockh) &&
389                     lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
390                         /* To pre-acquire quota space, we report how much spare
391                          * quota space the slave currently owns, then the master
392                          * will grant us back how much we can pretend given the
393                          * current state of affairs */
394                         if (granted <= usage)
395                                 qbody->qb_count = 0;
396                         else
397                                 qbody->qb_count = granted - usage;
398                         qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
399                 }
400                 break;
401         }
402         default:
403                 CERROR("Invalid qsd operation:%u\n", op);
404                 LBUG();
405                 break;
406         }
407         return qbody->qb_flags != 0;
408 }
409
410 /*
411  * Acquire/release quota space from master.
412  * There are at most 1 in-flight dqacq/dqrel.
413  *
414  * \param env    - the environment passed by the caller
415  * \param lqe    - is the qid entry to be processed
416  * \param op     - operation that want to be performed by the caller
417  *
418  * \retval 0     - success
419  * \retval -EDQUOT      : out of quota
420  *         -EINPROGRESS : inform client to retry write/create
421  *         -ve          : other appropriate errors
422  */
423 int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
424               enum qsd_ops op)
425 {
426         struct qsd_thread_info  *qti = qsd_info(env);
427         struct quota_body       *qbody = &qti->qti_body;
428         struct qsd_instance     *qsd;
429         struct qsd_qtype_info   *qqi;
430         struct ldlm_lock        *lock;
431         int                      rc;
432         bool                     intent = false, sync;
433         ENTRY;
434
435         qqi = lqe2qqi(lqe);
436         qsd = qqi->qqi_qsd;
437
438         if (qsd->qsd_stopping) {
439                 LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
440                 /* Target is about to shut down, client will retry */
441                 RETURN(-EINPROGRESS);
442         }
443
444         if (!qsd_ready(qsd)) {
445                 LQUOTA_DEBUG(lqe, "Connection to master not ready");
446                 RETURN(-ENOTCONN);
447         }
448
449         /* In most case, reintegration must have been triggered (when enable
450          * quota or on OST start), however, in rare race condition (enabling
451          * quota when starting OSTs), we might miss triggering reintegration
452          * for some qqi.
453          *
454          * If the previous reintegration failed for some reason, we'll
455          * re-trigger it here as well. */
456         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
457                 LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
458                              " off reintegration");
459                 qsd_start_reint_thread(qqi);
460                 RETURN(-EINPROGRESS);
461         }
462
463         LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
464
465         /* Fill the remote global lock handle, master will check this handle
466          * to see if the slave is sending request with stale lock */
467         cfs_read_lock(&qsd->qsd_lock);
468         lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
469         cfs_read_unlock(&qsd->qsd_lock);
470
471         if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
472                 RETURN(-ENOLCK);
473
474         lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
475         if (lock == NULL)
476                 RETURN(-ENOLCK);
477         lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
478         LDLM_LOCK_PUT(lock);
479
480         /* We allow only one in-flight dqacq/dqrel for specified qid, if
481          * there is already in-flight dqacq/dqrel:
482          *
483          * - For QSD_ADJ: we should just abort it, since local limit is going
484          *   to be changed soon;
485          * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
486          *   finished, and return success to the caller. The caller is
487          *   responsible for retrying;
488          * - For QSD_REP: we should just abort it, since slave has already
489          *   acquired/released grant; */
490         sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
491         LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
492                  lqe->lqe_pending_req);
493
494         lqe_write_lock(lqe);
495         if (lqe->lqe_pending_req != 0) {
496                 struct l_wait_info lwi = { 0 };
497
498                 lqe_write_unlock(lqe);
499                 if (!sync) {
500                         LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
501                         RETURN(0);
502                 }
503
504                 LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
505                 l_wait_event(lqe->lqe_waiters,
506                              !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
507                              &lwi);
508                 RETURN(0);
509         }
510
511         /* fill qb_count & qb_flags */
512         if (!qsd_calc_space(lqe, op, qbody)) {
513                 lqe_write_unlock(lqe);
514                 LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
515                 RETURN(0);
516         }
517         lqe->lqe_pending_req++;
518         lqe_write_unlock(lqe);
519
520         /* fill other quota body fields */
521         qbody->qb_fid = qqi->qqi_fid;
522         qbody->qb_id  = lqe->lqe_id;
523         memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
524         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
525
526         /* hold a refcount until completion */
527         lqe_getref(lqe);
528
529         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
530                 /* check whether we already own a lock for this ID */
531                 lqe_read_lock(lqe);
532                 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
533                 lqe_read_unlock(lqe);
534
535                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
536                 if (rc) {
537                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
538                         if (req_is_preacq(qbody->qb_flags)) {
539                                 if (req_has_rep(qbody->qb_flags))
540                                         /* still want to report usage */
541                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
542                                 else
543                                         /* no pre-acquire if no per-ID lock */
544                                         GOTO(out, rc = -ENOLCK);
545                         } else {
546                                 /* no lock found, should use intent */
547                                 intent = true;
548                         }
549                 } else if (req_is_acq(qbody->qb_flags) &&
550                            qbody->qb_count == 0) {
551                         /* found cached lock, no need to acquire */
552                         GOTO(out, rc = 0);
553                 }
554         }
555
556         if (!intent) {
557                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
558                                     qsd_dqacq_completion, qqi, &qti->qti_lockh,
559                                     lqe);
560         } else {
561                 union ldlm_wire_lvb *lvb;
562
563                 OBD_ALLOC_PTR(lvb);
564                 if (lvb == NULL)
565                         GOTO(out, rc = -ENOMEM);
566
567                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
568                                      IT_QUOTA_DQACQ, qsd_dqacq_completion,
569                                      qqi, lvb, (void *)lqe);
570         }
571         /* the completion function will be called by qsd_send_dqacq or
572          * qsd_intent_lock */
573         RETURN(rc);
574 out:
575         qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
576                              rc);
577         return rc;
578 }
579
580 /*
581  * Quota enforcement handler. If local quota can satisfy this operation,
582  * return success, otherwise, acquire more quota from master.
583  * (for write operation, if master isn't available at this moment, return
584  * -EINPROGRESS to inform client to retry the write)
585  *
586  * \param env   - the environment passed by the caller
587  * \param qsd   - is the qsd instance associated with the device in charge
588  *                of the operation.
589  * \param qid   - is the qid information attached in the transaction handle
590  * \param space - is the space required by the operation
591  * \param flags - if the operation is write, return caller no user/group
592  *                and sync commit flags
593  *
594  * \retval 0        - success
595  * \retval -EDQUOT      : out of quota
596  *         -EINPROGRESS : inform client to retry write
597  *         -ve          : other appropriate errors
598  */
599 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
600                          struct lquota_id_info *qid, long long space,
601                          int *flags)
602 {
603         struct lquota_entry *lqe;
604         int                  rc = 0, retry_cnt;
605         ENTRY;
606
607         if (qid->lqi_qentry != NULL) {
608                 /* we already had to deal with this id for this transaction */
609                 lqe = qid->lqi_qentry;
610                 if (!lqe->lqe_enforced)
611                         RETURN(0);
612         } else {
613                 /* look up lquota entry associated with qid */
614                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
615                 if (IS_ERR(lqe))
616                         RETURN(PTR_ERR(lqe));
617                 if (!lqe->lqe_enforced) {
618                         lqe_putref(lqe);
619                         RETURN(0);
620                 }
621                 qid->lqi_qentry = lqe;
622                 /* lqe will be released in qsd_op_end() */
623         }
624
625         if (space <= 0) {
626                 /* when space is negative or null, we don't need to consume
627                  * quota space. That said, we still want to perform space
628                  * adjustments in qsd_op_end, so we return here, but with
629                  * a reference on the lqe */
630                 if (flags != NULL) {
631                         rc = qsd_refresh_usage(env, lqe);
632                         GOTO(out_flags, rc);
633                 }
634                 RETURN(0);
635         }
636
637         LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
638
639         lqe_write_lock(lqe);
640         lqe->lqe_waiting_write += space;
641         lqe_write_unlock(lqe);
642
643         for (retry_cnt = 0; rc == 0; retry_cnt++) {
644                 /* refresh disk usage if required */
645                 rc = qsd_refresh_usage(env, lqe);
646                 if (rc)
647                         break;
648
649                 /* try to consume local quota space */
650                 rc = qsd_acquire_local(lqe, space);
651                 if (rc != -EAGAIN)
652                         /* rc == 0, Wouhou! enough local quota space
653                          * rc < 0, something bad happened */
654                         break;
655
656                 /* need to acquire more quota space from master, this is done
657                  * synchronously */
658                 rc = qsd_dqacq(env, lqe, QSD_ACQ);
659                 LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
660                              retry_cnt, rc);
661         }
662
663         if (rc == 0) {
664                 qid->lqi_space += space;
665         } else {
666                 LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
667
668                 lqe_write_lock(lqe);
669                 lqe->lqe_waiting_write -= space;
670
671                 if (flags && lqe->lqe_pending_write != 0)
672                         /* Inform OSD layer that there are pending writes.
673                          * It might want to retry after a sync if appropriate */
674                          *flags |= QUOTA_FL_SYNC;
675                 lqe_write_unlock(lqe);
676
677                 /* convert recoverable error into -EINPROGRESS, and client will
678                  * retry write on -EINPROGRESS. */
679                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
680                     rc == -EAGAIN || rc == -EINTR)
681                         rc = -EINPROGRESS;
682         }
683
684         if (flags != NULL) {
685 out_flags:
686                 LASSERT(qid->lqi_is_blk);
687                 if (rc != 0) {
688                         *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
689                 } else {
690                         __u64   usage;
691
692                         lqe_read_lock(lqe);
693                         usage  = lqe->lqe_usage;
694                         usage += lqe->lqe_pending_write;
695                         usage += lqe->lqe_waiting_write;
696                         usage += qqi->qqi_qsd->qsd_sync_threshold;
697
698                         /* if we should notify client to start sync write */
699                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
700                                 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
701                         else
702                                 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
703                         lqe_read_unlock(lqe);
704                 }
705         }
706         RETURN(rc);
707 }
708
709 static inline bool qid_equal(struct lquota_id_info *q1,
710                              struct lquota_id_info *q2)
711 {
712         if (q1->lqi_type != q2->lqi_type)
713                 return false;
714         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
715 }
716
717 /*
718  * Enforce quota, it's called in the declaration of each operation.
719  * qsd_op_end() will then be called later once all the operations have been
720  * completed in order to release/adjust the quota space.
721  *
722  * \param env        - the environment passed by the caller
723  * \param qsd        - is the qsd instance associated with the device in charge
724  *                     of the operation.
725  * \param trans      - is the quota transaction information
726  * \param qi         - qid & space required by current operation
727  * \param flags      - if the operation is write, return caller no user/group
728  *                     and sync commit flags
729  *
730  * \retval 0        - success
731  * \retval -EDQUOT      : out of quota
732  *         -EINPROGRESS : inform client to retry write
733  *         -ve          : other appropriate errors
734  */
735 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
736                  struct lquota_trans *trans, struct lquota_id_info *qi,
737                  int *flags)
738 {
739         struct qsd_qtype_info *qqi;
740         int                    i, rc;
741         bool                   found = false;
742         ENTRY;
743
744         if (unlikely(qsd == NULL))
745                 RETURN(0);
746
747         /* We don't enforce quota until the qsd_instance is started */
748         cfs_read_lock(&qsd->qsd_lock);
749         if (!qsd->qsd_started) {
750                 cfs_read_unlock(&qsd->qsd_lock);
751                 RETURN(0);
752         }
753         cfs_read_unlock(&qsd->qsd_lock);
754
755         /* ignore block quota on MDTs, ignore inode quota on OSTs */
756         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
757             (qsd->qsd_is_md && qi->lqi_is_blk))
758                 RETURN(0);
759
760         qqi = qsd->qsd_type_array[qi->lqi_type];
761
762         /* ignore quota enforcement request when:
763          *    - quota isn't enforced for this quota type
764          * or - we failed to access the accounting object for this quota type
765          * or - the space to acquire is null
766          * or - the user/group is root */
767         if (!qsd_type_enabled(qsd, qi->lqi_type) || qqi->qqi_acct_obj == NULL ||
768             qi->lqi_id.qid_uid == 0)
769                 RETURN(0);
770
771         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
772                  trans->lqt_id_cnt);
773         /* check whether we already allocated a slot for this id */
774         for (i = 0; i < trans->lqt_id_cnt; i++) {
775                 if (qid_equal(qi, &trans->lqt_ids[i])) {
776                         found = true;
777                         /* make sure we are not mixing inodes & blocks */
778                         LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
779                         break;
780                 }
781         }
782
783         if (!found) {
784                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
785                         CERROR("%s: more than %d qids enforced for a "
786                                "transaction?\n", qsd->qsd_svname, i);
787                         RETURN(-EINVAL);
788                 }
789
790                 /* fill new slot */
791                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
792                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
793                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
794                 trans->lqt_id_cnt++;
795         }
796
797         /* manage quota enforcement for this ID */
798         rc = qsd_op_begin0(env, qqi, &trans->lqt_ids[i], qi->lqi_space, flags);
799
800         RETURN(rc);
801 }
802 EXPORT_SYMBOL(qsd_op_begin);
803
804 /**
805  * Post quota operation, pre-acquire/release quota from master.
806  *
807  * \param  env  - the environment passed by the caller
808  * \param  qsd  - is the qsd instance attached to the OSD device which
809  *                is handling the operation.
810  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
811  *                subject to the operation
812  * \param  qid  - stores information related to his ID for the operation
813  *                which has just completed
814  *
815  * \retval 0    - success
816  * \retval -ve  - failure
817  */
818 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
819                         struct lquota_id_info *qid)
820 {
821         struct lquota_entry     *lqe;
822         bool                     adjust;
823         ENTRY;
824
825         lqe = qid->lqi_qentry;
826         if (lqe == NULL)
827                 RETURN_EXIT;
828         qid->lqi_qentry = NULL;
829
830         /* refresh cached usage if a suitable environment is passed */
831         if (env != NULL)
832                 qsd_refresh_usage(env, lqe);
833
834         lqe_write_lock(lqe);
835         if (qid->lqi_space > 0)
836                 lqe->lqe_pending_write -= qid->lqi_space;
837         if (env != NULL)
838                 adjust = qsd_adjust_needed(lqe);
839         else
840                 adjust = true;
841         lqe_write_unlock(lqe);
842
843         if (adjust) {
844                 /* pre-acquire/release quota space is needed */
845                 if (env != NULL)
846                         qsd_dqacq(env, lqe, QSD_ADJ);
847                 else
848                         /* no suitable environment, handle adjustment in
849                          * separate thread context */
850                         qsd_adjust_schedule(lqe, false, false);
851         }
852         lqe_putref(lqe);
853         EXIT;
854 }
855
856 /*
857  * Post quota operation. It's called after each operation transaction stopped.
858  *
859  * \param  env   - the environment passed by the caller
860  * \param  qsd   - is the qsd instance associated with device which is handling
861  *                 the operation.
862  * \param  qids  - all qids information attached in the transaction handle
863  * \param  count - is the number of qid entries in the qids array.
864  *
865  * \retval 0     - success
866  * \retval -ve   - failure
867  */
868 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
869                 struct lquota_trans *trans)
870 {
871         int i;
872         ENTRY;
873
874         if (unlikely(qsd == NULL))
875                 RETURN_EXIT;
876
877         /* We don't enforce quota until the qsd_instance is started */
878         cfs_read_lock(&qsd->qsd_lock);
879         if (!qsd->qsd_started) {
880                 cfs_read_unlock(&qsd->qsd_lock);
881                 RETURN_EXIT;
882         }
883         cfs_read_unlock(&qsd->qsd_lock);
884
885         LASSERT(trans != NULL);
886
887         for (i = 0; i < trans->lqt_id_cnt; i++) {
888                 struct qsd_qtype_info *qqi;
889
890                 if (trans->lqt_ids[i].lqi_qentry == NULL)
891                         continue;
892
893                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
894                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
895         }
896
897         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
898          * does not result in failure */
899         trans->lqt_id_cnt = 0;
900         EXIT;
901 }
902 EXPORT_SYMBOL(qsd_op_end);
903
904 void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
905                       union lquota_id *qid, int qtype)
906 {
907         struct lquota_entry    *lqe;
908         struct qsd_qtype_info  *qqi;
909         bool                    adjust;
910         ENTRY;
911
912         if (unlikely(qsd == NULL))
913                 RETURN_EXIT;
914
915         /* We don't enforce quota until the qsd_instance is started */
916         cfs_read_lock(&qsd->qsd_lock);
917         if (!qsd->qsd_started) {
918                 cfs_read_unlock(&qsd->qsd_lock);
919                 RETURN_EXIT;
920         }
921         cfs_read_unlock(&qsd->qsd_lock);
922
923         qqi = qsd->qsd_type_array[qtype];
924         LASSERT(qqi);
925
926         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
927             qid->qid_uid == 0)
928                 RETURN_EXIT;
929
930         cfs_read_lock(&qsd->qsd_lock);
931         if (!qsd->qsd_started) {
932                 cfs_read_unlock(&qsd->qsd_lock);
933                 RETURN_EXIT;
934         }
935         cfs_read_unlock(&qsd->qsd_lock);
936
937         lqe = lqe_locate(env, qqi->qqi_site, qid);
938         if (IS_ERR(lqe)) {
939                 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
940                        qsd->qsd_svname, qid->qid_uid, qtype);
941                 RETURN_EXIT;
942         }
943
944         qsd_refresh_usage(env, lqe);
945
946         lqe_read_lock(lqe);
947         adjust = qsd_adjust_needed(lqe);
948         lqe_read_unlock(lqe);
949
950         if (adjust)
951                 qsd_dqacq(env, lqe, QSD_ADJ);
952
953         lqe_putref(lqe);
954         EXIT;
955 }
956 EXPORT_SYMBOL(qsd_adjust_quota);