Whamcloud - gitweb
5bdb0cf2477638c13677ba1e068bf719eb31b611
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <linux/version.h>
38 #include <linux/fs.h>
39 #include <asm/unistd.h>
40 #include <linux/quotaops.h>
41 #include <linux/init.h>
42
43 #include <obd_class.h>
44 #include <lustre_param.h>
45 #include <lprocfs_status.h>
46
47 #include "qsd_internal.h"
48
49 /*
50  * helper function returning how much space is currently reserved for requests
51  * in flight.
52  */
53 static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
54 {
55         int     pending;
56
57         lqe_read_lock(lqe);
58         pending = lqe->lqe_pending_req;
59         lqe_read_unlock(lqe);
60
61         return pending;
62 }
63
64 /*
65  * helper function returning true when the connection to master is ready to be
66  * used.
67  */
68 static inline int qsd_ready(struct qsd_instance *qsd)
69 {
70         struct obd_import       *imp = NULL;
71
72         cfs_read_lock(&qsd->qsd_lock);
73         if (qsd->qsd_exp_valid)
74                 imp = class_exp2cliimp(qsd->qsd_exp);
75         cfs_read_unlock(&qsd->qsd_lock);
76
77         return (imp == NULL || imp->imp_invalid) ? false : true;
78 }
79
80 /*
81  * Helper function returning true when quota space need to be adjusted (some
82  * unused space should be free or pre-acquire) and false otherwise.
83  */
84 static bool qsd_adjust_needed(struct lquota_entry *lqe)
85 {
86         struct qsd_qtype_info   *qqi;
87         __u64                    usage, granted;
88
89         qqi = lqe2qqi(lqe);
90
91         if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
92                 /* if quota isn't enforced for this id, no need to adjust
93                  * Similarly, no need to perform adjustment if the target is in
94                  * the process of shutting down. */
95                 return false;
96
97         usage  = lqe->lqe_usage;
98         usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
99         granted = lqe->lqe_granted - lqe->lqe_pending_rel;
100
101         /* need to re-acquire per-ID lock or release all grant */
102         if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
103             lqe->lqe_granted > lqe->lqe_usage)
104                 return true;
105
106         /* good old quota qunit adjustment logic which has been around since
107          * lustre 1.4:
108          * 1. Need to release some space? */
109         if (granted > usage + lqe->lqe_qunit)
110                 return true;
111
112         /* 2. Any quota overrun? */
113         if (lqe->lqe_usage > lqe->lqe_granted)
114                 /* we ended up consuming more than we own, we need to have this
115                  * fixed ASAP */
116                 return true;
117
118         /* 3. Time to pre-acquire? */
119         if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
120             granted < usage + lqe->lqe_qtune)
121                 /* need to pre-acquire some space if we don't want to block
122                  * client's requests */
123                 return true;
124
125         return false;
126 }
127
128 /*
129  * Callback function called when an acquire/release request sent to the master
130  * is completed
131  */
132 static void qsd_dqacq_completion(const struct lu_env *env,
133                                  struct qsd_qtype_info *qqi,
134                                  struct quota_body *reqbody,
135                                  struct quota_body *repbody,
136                                  struct lustre_handle *lockh,
137                                  union ldlm_wire_lvb *lvb,
138                                  void *arg, int ret)
139 {
140         struct lquota_entry     *lqe = (struct lquota_entry *)arg;
141         struct qsd_thread_info  *qti;
142         int                      rc;
143         bool                     adjust = false, cancel = false;
144         ENTRY;
145
146         LASSERT(qqi != NULL && lqe != NULL);
147
148         /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
149          * DT tags set. */
150         rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
151         if (rc) {
152                 LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
153                 lqe_write_lock(lqe);
154                 /* can't afford to adjust quota space with no suitable lu_env */
155                 GOTO(out_noadjust, rc);
156         }
157         qti = qsd_info(env);
158
159         lqe_write_lock(lqe);
160
161         LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
162                      reqbody->qb_flags);
163
164         /* despite -EDQUOT & -EINPROGRESS errors, the master might still
165          * grant us back quota space to adjust quota overrun */
166         if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
167                 if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
168                    ret != -ESHUTDOWN && ret != -EAGAIN)
169                         /* print errors only if return code is unexpected */
170                         LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
171                                      ret, reqbody->qb_flags);
172                 GOTO(out, ret);
173         }
174
175         /* Set the lqe_lockh */
176         if (lustre_handle_is_used(lockh) &&
177             !lustre_handle_equal(lockh, &lqe->lqe_lockh))
178                 lustre_handle_copy(&lqe->lqe_lockh, lockh);
179
180         /* If the replied qb_count is zero, it means master didn't process
181          * the DQACQ since the limit for this ID has been removed, so we
182          * should not update quota entry & slave index copy neither. */
183         if (repbody != NULL && repbody->qb_count != 0) {
184                 LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
185
186                 if (req_is_rel(reqbody->qb_flags)) {
187                         if (lqe->lqe_granted < repbody->qb_count) {
188                                 LQUOTA_ERROR(lqe, "can't release more space "
189                                              "than owned "LPU64"<"LPU64,
190                                              lqe->lqe_granted,
191                                              repbody->qb_count);
192                                 lqe->lqe_granted = 0;
193                         } else {
194                                 lqe->lqe_granted -= repbody->qb_count;
195                         }
196                         /* Cancel the per-ID lock initiatively when there
197                          * isn't any usage & grant, which can avoid master
198                          * sending glimpse unnecessarily to this slave on
199                          * quota revoking */
200                         if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
201                             !lqe->lqe_waiting_write && !lqe->lqe_usage)
202                                 cancel = true;
203                 } else {
204                         lqe->lqe_granted += repbody->qb_count;
205                 }
206                 qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
207                 lqe_write_unlock(lqe);
208
209                 /* Update the slave index file in the dedicated thread. So far,
210                  * We don't update the version of slave index copy on DQACQ.
211                  * No locking is necessary since nobody can change
212                  * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
213                 qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
214                                  false);
215                 lqe_write_lock(lqe);
216         }
217
218         /* extract information from lvb */
219         if (ret == 0 && lvb != 0) {
220                 if (lvb->l_lquota.lvb_id_qunit != 0)
221                         qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
222                 if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
223                         lqe->lqe_edquot = true;
224                 else
225                         lqe->lqe_edquot = false;
226         } else if (repbody != NULL && repbody->qb_qunit != 0) {
227                 qsd_set_qunit(lqe, repbody->qb_qunit);
228         }
229
230         /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
231          * flooding the master with acquire request. Pre-acquire will be turned
232          * on again as soon as qunit is modified */
233         if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
234                 lqe->lqe_nopreacq = true;
235 out:
236         adjust = qsd_adjust_needed(lqe);
237 out_noadjust:
238         lqe->lqe_pending_req--;
239         lqe->lqe_pending_rel = 0;
240         lqe_write_unlock(lqe);
241
242         cfs_waitq_broadcast(&lqe->lqe_waiters);
243
244         /* release reference on per-ID lock */
245         if (lustre_handle_is_used(lockh))
246                 ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
247
248         if (cancel) {
249                 qsd_adjust_schedule(lqe, false, true);
250         } else if (adjust) {
251                 if (!ret || ret == -EDQUOT)
252                         qsd_adjust_schedule(lqe, false, false);
253                 else
254                         qsd_adjust_schedule(lqe, true, false);
255         }
256
257         if (lvb)
258                 /* free lvb allocated in qsd_dqacq */
259                 OBD_FREE_PTR(lvb);
260
261         lqe_putref(lqe);
262         EXIT;
263 }
264
265 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
266 {
267         __u64   usage;
268         int     rc;
269         ENTRY;
270
271         if (!lqe->lqe_enforced)
272                 /* not enforced any more, we are good */
273                 RETURN(0);
274
275         lqe_write_lock(lqe);
276         /* use latest usage */
277         usage = lqe->lqe_usage;
278         /* take pending write into account */
279         usage += lqe->lqe_pending_write;
280
281         if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
282                 /* Yay! we got enough space */
283                 lqe->lqe_pending_write += space;
284                 lqe->lqe_waiting_write -= space;
285                 rc = 0;
286         } else if (lqe->lqe_edquot) {
287                 rc = -EDQUOT;
288         } else {
289                 rc = -EAGAIN;
290         }
291         lqe_write_unlock(lqe);
292
293         RETURN(rc);
294 }
295
296 static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
297                            struct quota_body *qbody)
298 {
299         struct qsd_qtype_info   *qqi;
300         __u64                    usage, granted;
301
302         if (!lqe->lqe_enforced && op != QSD_REL)
303                 return 0;
304
305         qqi = lqe2qqi(lqe);
306
307         LASSERT(lqe->lqe_pending_rel == 0);
308         usage   = lqe->lqe_usage;
309         usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
310         granted = lqe->lqe_granted;
311
312         qbody->qb_flags = 0;
313 again:
314         switch (op) {
315         case QSD_ACQ:
316                 /* if we overconsumed quota space, we report usage in request
317                  * so that master can adjust it unconditionally */
318                 if (lqe->lqe_usage > lqe->lqe_granted) {
319                         qbody->qb_usage = lqe->lqe_usage;
320                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
321                         granted = lqe->lqe_usage;
322                 }
323                 /* acquire as much as needed, but not more */
324                 if (usage > granted) {
325                         qbody->qb_count  = usage - granted;
326                         qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
327                 }
328                 break;
329         case QSD_REP:
330                 /* When reporting quota (during reintegration or on setquota
331                  * glimpse), we should release granted space if usage is 0.
332                  * Otherwise, if the usage is less than granted, we need to
333                  * acquire the per-ID lock to make sure the unused grant can be
334                  * reclaimed by per-ID lock glimpse. */
335                 if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
336                         LQUOTA_DEBUG(lqe, "Release on report!");
337                         GOTO(again, op = QSD_REL);
338                 } else if (lqe->lqe_usage == lqe->lqe_granted) {
339                         LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
340                                      "anything on report!");
341                 } else if (lqe->lqe_usage < lqe->lqe_granted) {
342                         LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
343                         qbody->qb_count = 0;
344                         qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
345                 } else {
346                         LASSERT(lqe->lqe_usage > lqe->lqe_granted);
347                         LQUOTA_DEBUG(lqe, "Reporting usage");
348                         qbody->qb_usage = lqe->lqe_usage;
349                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
350                 }
351                 break;
352         case QSD_REL:
353                 /* release unused quota space unconditionally */
354                 if (lqe->lqe_granted > lqe->lqe_usage) {
355                         qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
356                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
357                 }
358                 break;
359         case QSD_ADJ: {
360                 /* need to re-acquire per-ID lock or release all grant */
361                 if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
362                     lqe->lqe_granted > lqe->lqe_usage)
363                         GOTO(again, op = QSD_REP);
364
365                 /* release spare grant */
366                 if (granted > usage + lqe->lqe_qunit) {
367                         /* pre-release quota space */
368                         qbody->qb_count  = granted - usage;
369                         /* if usage == 0, release all granted space */
370                         if (usage) {
371                                 /* try to keep one qunit of quota space */
372                                 qbody->qb_count -= lqe->lqe_qunit;
373                                 /* but don't release less than qtune to avoid
374                                  * releasing space too often */
375                                 if (qbody->qb_count < lqe->lqe_qtune)
376                                         qbody->qb_count = lqe->lqe_qtune;
377                         }
378                         qbody->qb_flags = QUOTA_DQACQ_FL_REL;
379                         break;
380                 }
381
382                 /* if we overconsumed quota space, we report usage in request
383                  * so that master can adjust it unconditionally */
384                 if (lqe->lqe_usage > lqe->lqe_granted) {
385                         qbody->qb_usage = lqe->lqe_usage;
386                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
387                         granted         = lqe->lqe_usage;
388                 }
389
390                 if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
391                     lustre_handle_is_used(&lqe->lqe_lockh) && usage > 0 &&
392                     lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
393                         /* To pre-acquire quota space, we report how much spare
394                          * quota space the slave currently owns, then the master
395                          * will grant us back how much we can pretend given the
396                          * current state of affairs */
397                         if (granted <= usage)
398                                 qbody->qb_count = 0;
399                         else
400                                 qbody->qb_count = granted - usage;
401                         qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
402                 }
403                 break;
404         }
405         default:
406                 CERROR("Invalid qsd operation:%u\n", op);
407                 LBUG();
408                 break;
409         }
410         return qbody->qb_flags != 0;
411 }
412
413 /*
414  * Acquire/release quota space from master.
415  * There are at most 1 in-flight dqacq/dqrel.
416  *
417  * \param env    - the environment passed by the caller
418  * \param lqe    - is the qid entry to be processed
419  * \param op     - operation that want to be performed by the caller
420  *
421  * \retval 0     - success
422  * \retval -EDQUOT      : out of quota
423  *         -EINPROGRESS : inform client to retry write/create
424  *         -ve          : other appropriate errors
425  */
426 int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
427               enum qsd_ops op)
428 {
429         struct qsd_thread_info  *qti = qsd_info(env);
430         struct quota_body       *qbody = &qti->qti_body;
431         struct qsd_instance     *qsd;
432         struct qsd_qtype_info   *qqi;
433         struct ldlm_lock        *lock;
434         int                      rc;
435         bool                     intent = false, sync;
436         ENTRY;
437
438         qqi = lqe2qqi(lqe);
439         qsd = qqi->qqi_qsd;
440
441         if (qsd->qsd_stopping) {
442                 LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
443                 /* Target is about to shut down, client will retry */
444                 RETURN(-EINPROGRESS);
445         }
446
447         if (!qsd_ready(qsd)) {
448                 LQUOTA_DEBUG(lqe, "Connection to master not ready");
449                 RETURN(-ENOTCONN);
450         }
451
452         /* In most case, reintegration must have been triggered (when enable
453          * quota or on OST start), however, in rare race condition (enabling
454          * quota when starting OSTs), we might miss triggering reintegration
455          * for some qqi.
456          *
457          * If the previous reintegration failed for some reason, we'll
458          * re-trigger it here as well. */
459         if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
460                 LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
461                              " off reintegration");
462                 qsd_start_reint_thread(qqi);
463                 RETURN(-EINPROGRESS);
464         }
465
466         LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
467
468         /* Fill the remote global lock handle, master will check this handle
469          * to see if the slave is sending request with stale lock */
470         cfs_read_lock(&qsd->qsd_lock);
471         lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
472         cfs_read_unlock(&qsd->qsd_lock);
473
474         if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
475                 RETURN(-ENOLCK);
476
477         lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
478         if (lock == NULL)
479                 RETURN(-ENOLCK);
480         lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
481         LDLM_LOCK_PUT(lock);
482
483         /* We allow only one in-flight dqacq/dqrel for specified qid, if
484          * there is already in-flight dqacq/dqrel:
485          *
486          * - For QSD_ADJ: we should just abort it, since local limit is going
487          *   to be changed soon;
488          * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
489          *   finished, and return success to the caller. The caller is
490          *   responsible for retrying;
491          * - For QSD_REP: we should just abort it, since slave has already
492          *   acquired/released grant; */
493         sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
494         LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
495                  lqe->lqe_pending_req);
496
497         lqe_write_lock(lqe);
498         if (lqe->lqe_pending_req != 0) {
499                 struct l_wait_info lwi = { 0 };
500
501                 lqe_write_unlock(lqe);
502                 if (!sync) {
503                         LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
504                         RETURN(0);
505                 }
506
507                 LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
508                 l_wait_event(lqe->lqe_waiters,
509                              !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
510                              &lwi);
511                 RETURN(0);
512         }
513
514         /* fill qb_count & qb_flags */
515         if (!qsd_calc_space(lqe, op, qbody)) {
516                 lqe_write_unlock(lqe);
517                 LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
518                 RETURN(0);
519         }
520         lqe->lqe_pending_req++;
521         lqe_write_unlock(lqe);
522
523         /* fill other quota body fields */
524         qbody->qb_fid = qqi->qqi_fid;
525         qbody->qb_id  = lqe->lqe_id;
526         memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
527         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
528
529         /* hold a refcount until completion */
530         lqe_getref(lqe);
531
532         if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
533                 /* check whether we already own a lock for this ID */
534                 lqe_read_lock(lqe);
535                 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
536                 lqe_read_unlock(lqe);
537
538                 rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
539                 if (rc) {
540                         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
541                         if (req_is_preacq(qbody->qb_flags)) {
542                                 if (req_has_rep(qbody->qb_flags))
543                                         /* still want to report usage */
544                                         qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
545                                 else
546                                         /* no pre-acquire if no per-ID lock */
547                                         GOTO(out, rc = -ENOLCK);
548                         } else {
549                                 /* no lock found, should use intent */
550                                 intent = true;
551                         }
552                 } else if (req_is_acq(qbody->qb_flags) &&
553                            qbody->qb_count == 0) {
554                         /* found cached lock, no need to acquire */
555                         GOTO(out, rc = 0);
556                 }
557         }
558
559         if (!intent) {
560                 rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
561                                     qsd_dqacq_completion, qqi, &qti->qti_lockh,
562                                     lqe);
563         } else {
564                 union ldlm_wire_lvb *lvb;
565
566                 OBD_ALLOC_PTR(lvb);
567                 if (lvb == NULL)
568                         GOTO(out, rc = -ENOMEM);
569
570                 rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
571                                      IT_QUOTA_DQACQ, qsd_dqacq_completion,
572                                      qqi, lvb, (void *)lqe);
573         }
574         /* the completion function will be called by qsd_send_dqacq or
575          * qsd_intent_lock */
576         RETURN(rc);
577 out:
578         qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
579                              rc);
580         return rc;
581 }
582
583 /*
584  * Quota enforcement handler. If local quota can satisfy this operation,
585  * return success, otherwise, acquire more quota from master.
586  * (for write operation, if master isn't available at this moment, return
587  * -EINPROGRESS to inform client to retry the write)
588  *
589  * \param env   - the environment passed by the caller
590  * \param qsd   - is the qsd instance associated with the device in charge
591  *                of the operation.
592  * \param qid   - is the qid information attached in the transaction handle
593  * \param space - is the space required by the operation
594  * \param flags - if the operation is write, return caller no user/group
595  *                and sync commit flags
596  *
597  * \retval 0        - success
598  * \retval -EDQUOT      : out of quota
599  *         -EINPROGRESS : inform client to retry write
600  *         -ve          : other appropriate errors
601  */
602 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
603                          struct lquota_id_info *qid, long long space,
604                          int *flags)
605 {
606         struct lquota_entry *lqe;
607         int                  rc = 0, retry_cnt;
608         ENTRY;
609
610         if (qid->lqi_qentry != NULL) {
611                 /* we already had to deal with this id for this transaction */
612                 lqe = qid->lqi_qentry;
613                 if (!lqe->lqe_enforced)
614                         RETURN(0);
615         } else {
616                 /* look up lquota entry associated with qid */
617                 lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
618                 if (IS_ERR(lqe))
619                         RETURN(PTR_ERR(lqe));
620                 if (!lqe->lqe_enforced) {
621                         lqe_putref(lqe);
622                         RETURN(0);
623                 }
624                 qid->lqi_qentry = lqe;
625                 /* lqe will be released in qsd_op_end() */
626         }
627
628         if (space <= 0) {
629                 /* when space is negative or null, we don't need to consume
630                  * quota space. That said, we still want to perform space
631                  * adjustments in qsd_op_end, so we return here, but with
632                  * a reference on the lqe */
633                 if (flags != NULL) {
634                         rc = qsd_refresh_usage(env, lqe);
635                         GOTO(out_flags, rc);
636                 }
637                 RETURN(0);
638         }
639
640         LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
641
642         lqe_write_lock(lqe);
643         lqe->lqe_waiting_write += space;
644         lqe_write_unlock(lqe);
645
646         for (retry_cnt = 0; rc == 0; retry_cnt++) {
647                 /* refresh disk usage if required */
648                 rc = qsd_refresh_usage(env, lqe);
649                 if (rc)
650                         break;
651
652                 /* try to consume local quota space */
653                 rc = qsd_acquire_local(lqe, space);
654                 if (rc != -EAGAIN)
655                         /* rc == 0, Wouhou! enough local quota space
656                          * rc < 0, something bad happened */
657                         break;
658
659                 /* need to acquire more quota space from master, this is done
660                  * synchronously */
661                 rc = qsd_dqacq(env, lqe, QSD_ACQ);
662                 LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
663                              retry_cnt, rc);
664         }
665
666         if (rc == 0) {
667                 qid->lqi_space += space;
668         } else {
669                 LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
670
671                 lqe_write_lock(lqe);
672                 lqe->lqe_waiting_write -= space;
673
674                 if (flags && lqe->lqe_pending_write != 0)
675                         /* Inform OSD layer that there are pending writes.
676                          * It might want to retry after a sync if appropriate */
677                          *flags |= QUOTA_FL_SYNC;
678                 lqe_write_unlock(lqe);
679
680                 /* convert recoverable error into -EINPROGRESS, and client will
681                  * retry write on -EINPROGRESS. */
682                 if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
683                     rc == -EAGAIN || rc == -EINTR)
684                         rc = -EINPROGRESS;
685         }
686
687         if (flags != NULL) {
688 out_flags:
689                 LASSERT(qid->lqi_is_blk);
690                 if (rc != 0) {
691                         *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
692                 } else {
693                         __u64   usage;
694
695                         lqe_read_lock(lqe);
696                         usage  = lqe->lqe_usage;
697                         usage += lqe->lqe_pending_write;
698                         usage += lqe->lqe_waiting_write;
699                         usage += qqi->qqi_qsd->qsd_sync_threshold;
700
701                         /* if we should notify client to start sync write */
702                         if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
703                                 *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
704                         else
705                                 *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
706                         lqe_read_unlock(lqe);
707                 }
708         }
709         RETURN(rc);
710 }
711
712 static inline bool qid_equal(struct lquota_id_info *q1,
713                              struct lquota_id_info *q2)
714 {
715         if (q1->lqi_type != q2->lqi_type)
716                 return false;
717         return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
718 }
719
720 /*
721  * Enforce quota, it's called in the declaration of each operation.
722  * qsd_op_end() will then be called later once all the operations have been
723  * completed in order to release/adjust the quota space.
724  *
725  * \param env        - the environment passed by the caller
726  * \param qsd        - is the qsd instance associated with the device in charge
727  *                     of the operation.
728  * \param trans      - is the quota transaction information
729  * \param qi         - qid & space required by current operation
730  * \param flags      - if the operation is write, return caller no user/group
731  *                     and sync commit flags
732  *
733  * \retval 0        - success
734  * \retval -EDQUOT      : out of quota
735  *         -EINPROGRESS : inform client to retry write
736  *         -ve          : other appropriate errors
737  */
738 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
739                  struct lquota_trans *trans, struct lquota_id_info *qi,
740                  int *flags)
741 {
742         int     i, rc;
743         bool    found = false;
744         ENTRY;
745
746         if (unlikely(qsd == NULL))
747                 RETURN(0);
748
749         /* We don't enforce quota until the qsd_instance is started */
750         cfs_read_lock(&qsd->qsd_lock);
751         if (!qsd->qsd_started) {
752                 cfs_read_unlock(&qsd->qsd_lock);
753                 RETURN(0);
754         }
755         cfs_read_unlock(&qsd->qsd_lock);
756
757         /* ignore block quota on MDTs, ignore inode quota on OSTs */
758         if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
759             (qsd->qsd_is_md && qi->lqi_is_blk))
760                 RETURN(0);
761
762         /* ignore quota enforcement request when:
763          *    - quota isn't enforced for this quota type
764          * or - the user/group is root */
765         if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
766                 RETURN(0);
767
768         LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
769                  trans->lqt_id_cnt);
770         /* check whether we already allocated a slot for this id */
771         for (i = 0; i < trans->lqt_id_cnt; i++) {
772                 if (qid_equal(qi, &trans->lqt_ids[i])) {
773                         found = true;
774                         /* make sure we are not mixing inodes & blocks */
775                         LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
776                         break;
777                 }
778         }
779
780         if (!found) {
781                 if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
782                         CERROR("%s: more than %d qids enforced for a "
783                                "transaction?\n", qsd->qsd_svname, i);
784                         RETURN(-EINVAL);
785                 }
786
787                 /* fill new slot */
788                 trans->lqt_ids[i].lqi_id     = qi->lqi_id;
789                 trans->lqt_ids[i].lqi_type   = qi->lqi_type;
790                 trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
791                 trans->lqt_id_cnt++;
792         }
793
794         /* manage quota enforcement for this ID */
795         rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
796                            &trans->lqt_ids[i], qi->lqi_space, flags);
797         RETURN(rc);
798 }
799 EXPORT_SYMBOL(qsd_op_begin);
800
801 /**
802  * Post quota operation, pre-acquire/release quota from master.
803  *
804  * \param  env  - the environment passed by the caller
805  * \param  qsd  - is the qsd instance attached to the OSD device which
806  *                is handling the operation.
807  * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
808  *                subject to the operation
809  * \param  qid  - stores information related to his ID for the operation
810  *                which has just completed
811  *
812  * \retval 0    - success
813  * \retval -ve  - failure
814  */
815 static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
816                         struct lquota_id_info *qid)
817 {
818         struct lquota_entry     *lqe;
819         bool                     adjust;
820         ENTRY;
821
822         lqe = qid->lqi_qentry;
823         if (lqe == NULL)
824                 RETURN_EXIT;
825         qid->lqi_qentry = NULL;
826
827         /* refresh cached usage if a suitable environment is passed */
828         if (env != NULL)
829                 qsd_refresh_usage(env, lqe);
830
831         lqe_write_lock(lqe);
832         if (qid->lqi_space > 0)
833                 lqe->lqe_pending_write -= qid->lqi_space;
834         if (env != NULL)
835                 adjust = qsd_adjust_needed(lqe);
836         else
837                 adjust = true;
838         lqe_write_unlock(lqe);
839
840         if (adjust) {
841                 /* pre-acquire/release quota space is needed */
842                 if (env != NULL)
843                         qsd_dqacq(env, lqe, QSD_ADJ);
844                 else
845                         /* no suitable environment, handle adjustment in
846                          * separate thread context */
847                         qsd_adjust_schedule(lqe, false, false);
848         }
849         lqe_putref(lqe);
850         EXIT;
851 }
852
853 /*
854  * Post quota operation. It's called after each operation transaction stopped.
855  *
856  * \param  env   - the environment passed by the caller
857  * \param  qsd   - is the qsd instance associated with device which is handling
858  *                 the operation.
859  * \param  qids  - all qids information attached in the transaction handle
860  * \param  count - is the number of qid entries in the qids array.
861  *
862  * \retval 0     - success
863  * \retval -ve   - failure
864  */
865 void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
866                 struct lquota_trans *trans)
867 {
868         int i;
869         ENTRY;
870
871         if (unlikely(qsd == NULL))
872                 RETURN_EXIT;
873
874         /* We don't enforce quota until the qsd_instance is started */
875         cfs_read_lock(&qsd->qsd_lock);
876         if (!qsd->qsd_started) {
877                 cfs_read_unlock(&qsd->qsd_lock);
878                 RETURN_EXIT;
879         }
880         cfs_read_unlock(&qsd->qsd_lock);
881
882         LASSERT(trans != NULL);
883
884         for (i = 0; i < trans->lqt_id_cnt; i++) {
885                 struct qsd_qtype_info *qqi;
886
887                 if (trans->lqt_ids[i].lqi_qentry == NULL)
888                         continue;
889
890                 qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
891                 qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
892         }
893
894         /* reset id_count to 0 so that a second accidental call to qsd_op_end()
895          * does not result in failure */
896         trans->lqt_id_cnt = 0;
897         EXIT;
898 }
899 EXPORT_SYMBOL(qsd_op_end);
900
901 void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
902                       union lquota_id *qid, int qtype)
903 {
904         struct lquota_entry    *lqe;
905         struct qsd_qtype_info  *qqi;
906         bool                    adjust;
907         ENTRY;
908
909         if (unlikely(qsd == NULL))
910                 RETURN_EXIT;
911
912         /* We don't enforce quota until the qsd_instance is started */
913         cfs_read_lock(&qsd->qsd_lock);
914         if (!qsd->qsd_started) {
915                 cfs_read_unlock(&qsd->qsd_lock);
916                 RETURN_EXIT;
917         }
918         cfs_read_unlock(&qsd->qsd_lock);
919
920         qqi = qsd->qsd_type_array[qtype];
921         LASSERT(qqi);
922
923         if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
924             qid->qid_uid == 0)
925                 RETURN_EXIT;
926
927         cfs_read_lock(&qsd->qsd_lock);
928         if (!qsd->qsd_started) {
929                 cfs_read_unlock(&qsd->qsd_lock);
930                 RETURN_EXIT;
931         }
932         cfs_read_unlock(&qsd->qsd_lock);
933
934         lqe = lqe_locate(env, qqi->qqi_site, qid);
935         if (IS_ERR(lqe)) {
936                 CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
937                        qsd->qsd_svname, qid->qid_uid, qtype);
938                 RETURN_EXIT;
939         }
940
941         qsd_refresh_usage(env, lqe);
942
943         lqe_read_lock(lqe);
944         adjust = qsd_adjust_needed(lqe);
945         lqe_read_unlock(lqe);
946
947         if (adjust)
948                 qsd_dqacq(env, lqe, QSD_ADJ);
949
950         lqe_putref(lqe);
951         EXIT;
952 }
953 EXPORT_SYMBOL(qsd_adjust_quota);