Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qmt_internal.h"
41
42 /* intent policy function called from mdt_intent_opc() when the intent is of
43  * quota type */
44 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
45                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
46                       int flags)
47 {
48         struct qmt_device       *qmt = lu2qmt_dev(ld);
49         struct ldlm_intent      *it;
50         struct quota_body       *reqbody;
51         struct quota_body       *repbody;
52         struct obd_uuid         *uuid;
53         struct lquota_lvb       *lvb;
54         struct ldlm_resource    *res = (*lockp)->l_resource;
55         int                      rc;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59
60         /* extract quota body and intent opc */
61         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
62         if (it == NULL)
63                 RETURN(err_serious(-EFAULT));
64
65         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
66         if (reqbody == NULL)
67                 RETURN(err_serious(-EFAULT));
68
69         /* prepare reply */
70         rc = req_capsule_server_pack(&req->rq_pill);
71         if (rc != 0) {
72                 CERROR("Can't pack response, rc %d\n", rc);
73                 RETURN(err_serious(rc));
74         }
75
76         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
77         if (repbody == NULL)
78                 RETURN(err_serious(-EFAULT));
79
80         uuid = &(*lockp)->l_export->exp_client_uuid;
81         switch (it->opc) {
82
83         case IT_QUOTA_DQACQ: {
84                 struct lquota_entry     *lqe;
85                 struct ldlm_lock        *lock;
86
87                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
88                         /* acquire on global lock? something is wrong ... */
89                         GOTO(out, rc = -EPROTO);
90
91                 /* verify global lock isn't stale */
92                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
93                         GOTO(out, rc = -ENOLCK);
94
95                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
96                 if (lock == NULL)
97                         GOTO(out, rc = -ENOLCK);
98                 LDLM_LOCK_PUT(lock);
99
100                 lqe = res->lr_lvb_data;
101                 LASSERT(lqe != NULL);
102                 lqe_getref(lqe);
103
104                 /* acquire quota space */
105                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
106                                 reqbody->qb_count, reqbody->qb_usage,
107                                 repbody);
108                 lqe_putref(lqe);
109                 if (rc)
110                         GOTO(out, rc);
111                 break;
112         }
113
114         case IT_QUOTA_CONN:
115                 /* new connection from slave */
116
117                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
118                         /* connection on per-ID lock? something is wrong ... */
119                         GOTO(out, rc = -EPROTO);
120
121                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
122                                        &repbody->qb_slv_fid,
123                                        &repbody->qb_slv_ver, uuid);
124                 if (rc)
125                         GOTO(out, rc);
126                 break;
127
128         default:
129                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
130                        it->opc);
131                 GOTO(out, rc = err_serious(-EINVAL));
132         }
133
134         /* on success, pack lvb in reply */
135         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
136                              ldlm_lvbo_size(*lockp));
137         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
138         ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
139         EXIT;
140 out:
141         return rc;
142 }
143
144 /*
145  * Initialize quota LVB associated with quota indexes.
146  * Called with res->lr_lvb_sem held
147  */
148 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
149 {
150         struct lu_env           *env;
151         struct qmt_thread_info  *qti;
152         struct qmt_device       *qmt = lu2qmt_dev(ld);
153         int                      pool_id, pool_type, qtype;
154         int                      rc;
155         ENTRY;
156
157         LASSERT(res != NULL);
158
159         if (res->lr_type != LDLM_PLAIN)
160                 RETURN(-ENOTSUPP);
161
162         if (res->lr_lvb_data ||
163             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
164                 RETURN(0);
165
166         OBD_ALLOC_PTR(env);
167         if (env == NULL)
168                 RETURN(-ENOMEM);
169
170         /* initialize environment */
171         rc = lu_env_init(env, LCT_MD_THREAD);
172         if (rc) {
173                 OBD_FREE_PTR(env);
174                 RETURN(rc);
175         }
176         qti = qmt_info(env);
177
178         /* extract global index FID and quota identifier */
179         fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
180
181         /* sanity check the global index FID */
182         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
183         if (rc) {
184                 CERROR("can't extract pool information from FID "DFID"\n",
185                        PFID(&qti->qti_fid));
186                 GOTO(out, rc);
187         }
188
189         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
190                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
191                  * we are thus dealing with an ID lock. */
192                 struct lquota_entry     *lqe;
193
194                 /* Find the quota entry associated with the quota id */
195                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
196                                           &qti->qti_id);
197                 if (IS_ERR(lqe))
198                         GOTO(out, rc = PTR_ERR(lqe));
199
200                 /* store reference to lqe in lr_lvb_data */
201                 res->lr_lvb_data = lqe;
202                 LQUOTA_DEBUG(lqe, "initialized res lvb");
203         } else {
204                 struct dt_object        *obj;
205
206                 /* lookup global index */
207                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
208                 if (IS_ERR(obj))
209                         GOTO(out, rc = PTR_ERR(obj));
210                 if (!dt_object_exists(obj)) {
211                         lu_object_put(env, &obj->do_lu);
212                         GOTO(out, rc = -ENOENT);
213                 }
214
215                 /* store reference to global index object in lr_lvb_data */
216                 res->lr_lvb_data = obj;
217                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
218         }
219
220         res->lr_lvb_len  = sizeof(struct lquota_lvb);
221         EXIT;
222 out:
223         lu_env_fini(env);
224         OBD_FREE_PTR(env);
225         return rc;
226 }
227
228 /*
229  * Update LVB associated with the global quota index.
230  * This function is called from the DLM itself after a glimpse callback, in this
231  * case valid ptlrpc request is passed.
232  */
233 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
234                     struct ptlrpc_request *req, int increase_only)
235 {
236         struct lu_env           *env;
237         struct qmt_thread_info  *qti;
238         struct qmt_device       *qmt = lu2qmt_dev(ld);
239         struct lquota_entry     *lqe;
240         struct lquota_lvb       *lvb;
241         struct ldlm_lock        *lock;
242         struct obd_export       *exp;
243         int                      rc = 0;
244         ENTRY;
245
246         LASSERT(res != NULL);
247
248         if (req == NULL)
249                 RETURN(0);
250
251         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
252                 /* no need to update lvb for global quota locks */
253                 RETURN(0);
254
255         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
256         if (lvb == NULL) {
257                 CERROR("%s: failed to extract lvb from request\n",
258                        qmt->qmt_svname);
259                 RETURN(-EFAULT);
260         }
261
262         lqe = res->lr_lvb_data;
263         LASSERT(lqe != NULL);
264         lqe_getref(lqe);
265
266         LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
267                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
268
269         if (lvb->lvb_id_rel == 0) {
270                 /* nothing to release */
271                 if (lvb->lvb_id_may_rel != 0)
272                         /* but might still release later ... */
273                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
274                 GOTO(out_lqe, rc = 0);
275         }
276
277         /* allocate environement */
278         OBD_ALLOC_PTR(env);
279         if (env == NULL)
280                 GOTO(out_lqe, rc = -ENOMEM);
281
282         /* initialize environment */
283         rc = lu_env_init(env, LCT_MD_THREAD);
284         if (rc)
285                 GOTO(out_env, rc);
286         qti = qmt_info(env);
287
288         /* The request is a glimpse callback which was sent via the
289          * reverse import to the slave. What we care about here is the
290          * export associated with the slave and req->rq_export is
291          * definitely not what we are looking for (it is actually set to
292          * NULL here).
293          * Therefore we extract the lock from the request argument
294          * and use lock->l_export. */
295         lock = ldlm_request_lock(req);
296         if (IS_ERR(lock)) {
297                 CERROR("%s: failed to get lock from request!\n",
298                        qmt->qmt_svname);
299                 GOTO(out_env_init, rc = PTR_ERR(lock));
300         }
301
302         exp = class_export_get(lock->l_export);
303         if (exp == NULL) {
304                 CERROR("%s: failed to get export from lock!\n",
305                        qmt->qmt_svname);
306                 GOTO(out_env_init, rc = -EFAULT);
307         }
308
309         /* release quota space */
310         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
311                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
312         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
313                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
314                              LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
315                              lvb->lvb_id_rel, rc);
316         class_export_put(exp);
317         if (rc)
318                 GOTO(out_env_init, rc);
319         EXIT;
320 out_env_init:
321         lu_env_fini(env);
322 out_env:
323         OBD_FREE_PTR(env);
324 out_lqe:
325         lqe_putref(lqe);
326         return rc;
327 }
328
329 /*
330  * Report size of lvb to ldlm layer in order to allocate lvb buffer
331  * As far as quota locks are concerned, the size is static and is the same
332  * for both global and per-ID locks which shares the same lvb format.
333  */
334 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
335 {
336         return sizeof(struct lquota_lvb);
337 }
338
339 /*
340  * Fill request buffer with quota lvb
341  */
342 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
343                   int lvblen)
344 {
345         struct ldlm_resource    *res = lock->l_resource;
346         struct lquota_lvb       *qlvb = lvb;
347         ENTRY;
348
349         LASSERT(res != NULL);
350
351         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
352             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
353                 RETURN(-EINVAL);
354
355         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
356                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
357                  * we are thus dealing with an ID lock. */
358                 struct lquota_entry     *lqe = res->lr_lvb_data;
359
360                 /* return current qunit value & edquot flags in lvb */
361                 lqe_getref(lqe);
362                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
363                 qlvb->lvb_flags = 0;
364                 if (lqe->lqe_edquot)
365                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
366                 lqe_putref(lqe);
367         } else {
368                 /* global quota lock */
369                 struct lu_env           *env;
370                 int                      rc;
371                 struct dt_object        *obj = res->lr_lvb_data;
372
373                 OBD_ALLOC_PTR(env);
374                 if (env == NULL)
375                         RETURN(-ENOMEM);
376
377                 /* initialize environment */
378                 rc = lu_env_init(env, LCT_LOCAL);
379                 if (rc) {
380                         OBD_FREE_PTR(env);
381                         RETURN(rc);
382                 }
383
384                 /* return current version of global index */
385                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
386
387                 lu_env_fini(env);
388                 OBD_FREE_PTR(env);
389         }
390
391         RETURN(sizeof(struct lquota_lvb));
392 }
393
394 /*
395  * Free lvb associated with a given ldlm resource
396  * we don't really allocate a lvb, lr_lvb_data just points to
397  * the appropriate backend structures.
398  */
399 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
400 {
401         ENTRY;
402
403         if (res->lr_lvb_data == NULL)
404                 RETURN(0);
405
406         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
407                 struct lquota_entry     *lqe = res->lr_lvb_data;
408
409                 /* release lqe reference */
410                 lqe_putref(lqe);
411         } else {
412                 struct dt_object        *obj = res->lr_lvb_data;
413                 struct lu_env           *env;
414                 int                      rc;
415
416                 OBD_ALLOC_PTR(env);
417                 if (env == NULL)
418                         RETURN(-ENOMEM);
419
420                 /* initialize environment */
421                 rc = lu_env_init(env, LCT_LOCAL);
422                 if (rc) {
423                         OBD_FREE_PTR(env);
424                         RETURN(rc);
425                 }
426
427                 /* release object reference */
428                 lu_object_put(env, &obj->do_lu);
429                 lu_env_fini(env);
430                 OBD_FREE_PTR(env);
431         }
432
433         res->lr_lvb_data = NULL;
434         res->lr_lvb_len  = 0;
435
436         RETURN(0);
437 }
438
439 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
440                                 struct obd_uuid *, union ldlm_gl_desc *,
441                                 void *);
442 /*
443  * Send glimpse callback to slaves holding a lock on resource \res.
444  * This is used to notify slaves of new quota settings or to claim quota space
445  * back.
446  *
447  * \param env  - is the environment passed by the caller
448  * \param qmt  - is the quota master target
449  * \param res  - is the dlm resource associated with the quota object
450  * \param desc - is the glimpse descriptor to pack in glimpse callback
451  * \param cb   - is the callback function called on every lock and determine
452  *               whether a glimpse should be issued
453  * \param arg  - is an opaq parameter passed to the callback function
454  */
455 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
456                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
457                             qmt_glimpse_cb_t cb, void *arg)
458 {
459         cfs_list_t      *tmp, *pos;
460         CFS_LIST_HEAD(gl_list);
461         int              rc = 0;
462         ENTRY;
463
464         lock_res(res);
465         /* scan list of granted locks */
466         cfs_list_for_each(pos, &res->lr_granted) {
467                 struct ldlm_glimpse_work        *work;
468                 struct ldlm_lock                *lock;
469                 struct obd_uuid                 *uuid;
470
471                 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
472                 LASSERT(lock->l_export);
473                 uuid = &lock->l_export->exp_client_uuid;
474
475                 if (cb != NULL) {
476                         rc = cb(env, qmt, uuid, desc, arg);
477                         if (rc == 0)
478                                 /* slave should not be notified */
479                                 continue;
480                         if (rc < 0)
481                                 /* something wrong happened, we still notify */
482                                 CERROR("%s: callback function failed to "
483                                        "determine whether slave %s should be "
484                                        "notified (%d)\n", qmt->qmt_svname,
485                                        obd_uuid2str(uuid), rc);
486                 }
487
488                 OBD_ALLOC_PTR(work);
489                 if (work == NULL) {
490                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
491                                obd_uuid2str(uuid));
492                         continue;
493                 }
494
495                 cfs_list_add_tail(&work->gl_list, &gl_list);
496                 work->gl_lock  = LDLM_LOCK_GET(lock);
497                 work->gl_flags = 0;
498                 work->gl_desc  = desc;
499
500         }
501         unlock_res(res);
502
503         if (cfs_list_empty(&gl_list)) {
504                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
505                 RETURN(0);
506         }
507
508         /* issue glimpse callbacks to all connected slaves */
509         rc = ldlm_glimpse_locks(res, &gl_list);
510
511         cfs_list_for_each_safe(pos, tmp, &gl_list) {
512                 struct ldlm_glimpse_work *work;
513
514                 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
515
516                 cfs_list_del(&work->gl_list);
517                 CERROR("%s: failed to notify %s of new quota settings\n",
518                        qmt->qmt_svname,
519                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
520                 LDLM_LOCK_RELEASE(work->gl_lock);
521                 OBD_FREE_PTR(work);
522         }
523
524         RETURN(rc);
525 }
526
527 /*
528  * Send glimpse request to all global quota locks to push new quota setting to
529  * slaves.
530  *
531  * \param env - is the environment passed by the caller
532  * \param lqe - is the lquota entry which has new settings
533  * \param ver - is the version associated with the setting change
534  */
535 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
536                          __u64 ver)
537 {
538         struct qmt_thread_info  *qti = qmt_info(env);
539         struct qmt_pool_info    *pool = lqe2qpi(lqe);
540         struct ldlm_resource    *res = NULL;
541         int                      rc;
542         ENTRY;
543
544         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
545                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
546
547         /* send glimpse callback to notify slaves of new quota settings */
548         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
549         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
550         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
551         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
552         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
553
554         /* look up ldlm resource associated with global index */
555         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
556         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
557                                 LDLM_PLAIN, 0);
558         if (res == NULL) {
559                 /* this might happen if no slaves have enqueued global quota
560                  * locks yet */
561                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
562                              "with "DFID, PFID(&qti->qti_fid));
563                 RETURN_EXIT;
564         }
565
566         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
567                               NULL, NULL);
568         ldlm_resource_putref(res);
569         EXIT;
570 }
571
572 /* Callback function used to select locks that should be glimpsed when
573  * broadcasting the new qunit value */
574 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
575                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
576                           void *arg)
577 {
578         struct obd_uuid *slv_uuid = arg;
579
580         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
581                 RETURN(0);
582         RETURN(+1);
583 }
584
585 /*
586  * Send glimpse request on per-ID lock to push new qunit value to slave.
587  *
588  * \param env  - is the environment passed by the caller
589  * \param qmt  - is the quota master target device
590  * \param lqe  - is the lquota entry with the new qunit value
591  * \param uuid - is the uuid of the slave acquiring space, if any
592  */
593 static void qmt_id_lock_glimpse(const struct lu_env *env,
594                                 struct qmt_device *qmt,
595                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
596 {
597         struct qmt_thread_info  *qti = qmt_info(env);
598         struct qmt_pool_info    *pool = lqe2qpi(lqe);
599         struct ldlm_resource    *res = NULL;
600         int                      rc;
601         ENTRY;
602
603         if (!lqe->lqe_enforced)
604                 RETURN_EXIT;
605
606         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
607                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
608         fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
609         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
610                                 0);
611         if (res == NULL) {
612                 /* this might legitimately happens if slaves haven't had the
613                  * opportunity to enqueue quota lock yet. */
614                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
615                              "lock "DFID, PFID(&qti->qti_fid));
616                 lqe_write_lock(lqe);
617                 if (lqe->lqe_revoke_time == 0 &&
618                     lqe->lqe_qunit == pool->qpi_least_qunit)
619                         lqe->lqe_revoke_time = cfs_time_current_64();
620                 lqe_write_unlock(lqe);
621                 RETURN_EXIT;
622         }
623
624         lqe_write_lock(lqe);
625         /* The purpose of glimpse callback on per-ID lock is twofold:
626          * - notify slaves of new qunit value and hope they will release some
627          *   spare quota space in return
628          * - notify slaves that master ran out of quota space and there is no
629          *   need to send acquire request any more until further notice */
630
631         /* fill glimpse descriptor with lqe settings */
632         if (lqe->lqe_edquot)
633                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
634         else
635                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
636         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
637
638         if (lqe->lqe_revoke_time == 0 &&
639             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
640                 /* reset lqe_may_rel, it will be updated on glimpse callback
641                  * replies if needed */
642                 lqe->lqe_may_rel = 0;
643
644         /* The rebalance thread is the only thread which can issue glimpses */
645         LASSERT(!lqe->lqe_gl);
646         lqe->lqe_gl = true;
647         lqe_write_unlock(lqe);
648
649         /* issue glimpse callback to slaves */
650         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
651                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
652
653         lqe_write_lock(lqe);
654         if (lqe->lqe_revoke_time == 0 &&
655             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
656             lqe->lqe_qunit == pool->qpi_least_qunit) {
657                 lqe->lqe_revoke_time = cfs_time_current_64();
658                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
659         }
660         LASSERT(lqe->lqe_gl);
661         lqe->lqe_gl = false;
662         lqe_write_unlock(lqe);
663
664         ldlm_resource_putref(res);
665         EXIT;
666 }
667
668 /*
669  * Schedule a glimpse request on per-ID locks to push new qunit value or
670  * edquot flag to quota slaves.
671  *
672  * \param qmt  - is the quota master target device
673  * \param lqe  - is the lquota entry with the new qunit value
674  */
675 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
676 {
677         bool    added = false;
678         ENTRY;
679
680         lqe_getref(lqe);
681         spin_lock(&qmt->qmt_reba_lock);
682         if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
683                 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
684                 added = true;
685         }
686         spin_unlock(&qmt->qmt_reba_lock);
687
688         if (added)
689                 cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
690         else
691                 lqe_putref(lqe);
692         EXIT;
693 }
694
695 /*
696  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
697  * quota locks owned by slaves in order to notify them of:
698  * - a qunit shrink in which case slaves might release quota space back in
699  *   glimpse reply.
700  * - set/clear edquot flag used to cache the "quota exhausted" state of the
701  *   master. When the flag is set, slaves know that there is no need to
702  *   try to acquire quota from the master since this latter has already
703  *   distributed all the space.
704  */
705 static int qmt_reba_thread(void *arg)
706 {
707         struct qmt_device       *qmt = (struct qmt_device *)arg;
708         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
709         struct l_wait_info       lwi = { 0 };
710         struct lu_env           *env;
711         struct lquota_entry     *lqe, *tmp;
712         char                     pname[MTI_NAME_MAXLEN];
713         int                      rc;
714         ENTRY;
715
716         OBD_ALLOC_PTR(env);
717         if (env == NULL)
718                 RETURN(-ENOMEM);
719
720         rc = lu_env_init(env, LCT_MD_THREAD);
721         if (rc) {
722                 CERROR("%s: failed to init env.", qmt->qmt_svname);
723                 OBD_FREE_PTR(env);
724                 RETURN(rc);
725         }
726
727         snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
728         cfs_daemonize(pname);
729
730         thread_set_flags(thread, SVC_RUNNING);
731         cfs_waitq_signal(&thread->t_ctl_waitq);
732
733         while (1) {
734                 l_wait_event(thread->t_ctl_waitq,
735                              !cfs_list_empty(&qmt->qmt_reba_list) ||
736                              !thread_is_running(thread), &lwi);
737
738                 spin_lock(&qmt->qmt_reba_lock);
739                 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
740                                              lqe_link) {
741                         cfs_list_del_init(&lqe->lqe_link);
742                         spin_unlock(&qmt->qmt_reba_lock);
743
744                         if (thread_is_running(thread))
745                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
746
747                         lqe_putref(lqe);
748                         spin_lock(&qmt->qmt_reba_lock);
749                 }
750                 spin_unlock(&qmt->qmt_reba_lock);
751
752                 if (!thread_is_running(thread))
753                         break;
754         }
755         lu_env_fini(env);
756         OBD_FREE_PTR(env);
757         thread_set_flags(thread, SVC_STOPPED);
758         cfs_waitq_signal(&thread->t_ctl_waitq);
759         RETURN(rc);
760 }
761
762 /*
763  * Start rebalance thread. Called when the QMT is being setup
764  */
765 int qmt_start_reba_thread(struct qmt_device *qmt)
766 {
767         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
768         struct l_wait_info       lwi    = { 0 };
769         int                      rc;
770         ENTRY;
771
772         rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
773         if (rc < 0) {
774                 CERROR("%s: failed to start rebalance thread (%d)\n",
775                        qmt->qmt_svname, rc);
776                 thread_set_flags(thread, SVC_STOPPED);
777                 RETURN(rc);
778         }
779
780         l_wait_event(thread->t_ctl_waitq,
781                      thread_is_running(thread) || thread_is_stopped(thread),
782                      &lwi);
783
784         RETURN(0);
785 }
786
787 /*
788  * Stop rebalance thread. Called when the QMT is about to shutdown.
789  */
790 void qmt_stop_reba_thread(struct qmt_device *qmt)
791 {
792         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
793
794         if (!thread_is_stopped(thread)) {
795                 struct l_wait_info lwi = { 0 };
796
797                 thread_set_flags(thread, SVC_STOPPING);
798                 cfs_waitq_signal(&thread->t_ctl_waitq);
799
800                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
801                              &lwi);
802         }
803         LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
804 }