Whamcloud - gitweb
LU-1346 libcfs: cleanup libcfs primitive (linux-prim.h)
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <lustre_dlm.h>
34 #include <obd_class.h>
35
36 #include "qmt_internal.h"
37
38 /* intent policy function called from mdt_intent_opc() when the intent is of
39  * quota type */
40 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
41                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
42                       int flags)
43 {
44         struct qmt_device       *qmt = lu2qmt_dev(ld);
45         struct ldlm_intent      *it;
46         struct quota_body       *reqbody;
47         struct quota_body       *repbody;
48         struct obd_uuid         *uuid;
49         struct lquota_lvb       *lvb;
50         struct ldlm_resource    *res = (*lockp)->l_resource;
51         int                      rc, lvb_len;
52         ENTRY;
53
54         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
55         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
56                              ldlm_lvbo_size(*lockp));
57
58         /* extract quota body and intent opc */
59         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
60         if (it == NULL)
61                 RETURN(err_serious(-EFAULT));
62
63         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
64         if (reqbody == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         /* prepare reply */
68         rc = req_capsule_server_pack(&req->rq_pill);
69         if (rc != 0) {
70                 CERROR("Can't pack response, rc %d\n", rc);
71                 RETURN(err_serious(rc));
72         }
73
74         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
75         if (repbody == NULL)
76                 RETURN(err_serious(-EFAULT));
77
78         uuid = &(*lockp)->l_export->exp_client_uuid;
79         switch (it->opc) {
80
81         case IT_QUOTA_DQACQ: {
82                 struct lquota_entry     *lqe;
83                 struct ldlm_lock        *lock;
84
85                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
86                         /* acquire on global lock? something is wrong ... */
87                         GOTO(out, rc = -EPROTO);
88
89                 /* verify global lock isn't stale */
90                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
91                         GOTO(out, rc = -ENOLCK);
92
93                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
94                 if (lock == NULL)
95                         GOTO(out, rc = -ENOLCK);
96                 LDLM_LOCK_PUT(lock);
97
98                 lqe = res->lr_lvb_data;
99                 LASSERT(lqe != NULL);
100                 lqe_getref(lqe);
101
102                 /* acquire quota space */
103                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
104                                 reqbody->qb_count, reqbody->qb_usage,
105                                 repbody);
106                 lqe_putref(lqe);
107                 if (rc)
108                         GOTO(out, rc);
109                 break;
110         }
111
112         case IT_QUOTA_CONN:
113                 /* new connection from slave */
114
115                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
116                         /* connection on per-ID lock? something is wrong ... */
117                         GOTO(out, rc = -EPROTO);
118
119                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
120                                        &repbody->qb_slv_fid,
121                                        &repbody->qb_slv_ver, uuid);
122                 if (rc)
123                         GOTO(out, rc);
124                 break;
125
126         default:
127                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
128                        it->opc);
129                 GOTO(out, rc = err_serious(-EINVAL));
130         }
131
132         /* on success, pack lvb in reply */
133         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
134         lvb_len = ldlm_lvbo_size(*lockp);
135         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
136         if (lvb_len < 0)
137                 GOTO(out, rc = lvb_len);
138
139         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
140         EXIT;
141 out:
142         return rc;
143 }
144
145 /*
146  * Initialize quota LVB associated with quota indexes.
147  * Called with res->lr_lvb_sem held
148  */
149 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
150 {
151         struct lu_env           *env;
152         struct qmt_thread_info  *qti;
153         struct qmt_device       *qmt = lu2qmt_dev(ld);
154         int                      pool_id, pool_type, qtype;
155         int                      rc;
156         ENTRY;
157
158         LASSERT(res != NULL);
159
160         if (res->lr_type != LDLM_PLAIN)
161                 RETURN(-ENOTSUPP);
162
163         if (res->lr_lvb_data ||
164             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
165                 RETURN(0);
166
167         OBD_ALLOC_PTR(env);
168         if (env == NULL)
169                 RETURN(-ENOMEM);
170
171         /* initialize environment */
172         rc = lu_env_init(env, LCT_MD_THREAD);
173         if (rc != 0)
174                 GOTO(out_free, rc);
175         qti = qmt_info(env);
176
177         /* extract global index FID and quota identifier */
178         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
179
180         /* sanity check the global index FID */
181         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
182         if (rc) {
183                 CERROR("can't extract pool information from FID "DFID"\n",
184                        PFID(&qti->qti_fid));
185                 GOTO(out, rc);
186         }
187
188         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
189                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
190                  * we are thus dealing with an ID lock. */
191                 struct lquota_entry     *lqe;
192
193                 /* Find the quota entry associated with the quota id */
194                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
195                                           &qti->qti_id);
196                 if (IS_ERR(lqe))
197                         GOTO(out, rc = PTR_ERR(lqe));
198
199                 /* store reference to lqe in lr_lvb_data */
200                 res->lr_lvb_data = lqe;
201                 LQUOTA_DEBUG(lqe, "initialized res lvb");
202         } else {
203                 struct dt_object        *obj;
204
205                 /* lookup global index */
206                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
207                 if (IS_ERR(obj))
208                         GOTO(out, rc = PTR_ERR(obj));
209                 if (!dt_object_exists(obj)) {
210                         lu_object_put(env, &obj->do_lu);
211                         GOTO(out, rc = -ENOENT);
212                 }
213
214                 /* store reference to global index object in lr_lvb_data */
215                 res->lr_lvb_data = obj;
216                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
217         }
218
219         res->lr_lvb_len = sizeof(struct lquota_lvb);
220         EXIT;
221 out:
222         lu_env_fini(env);
223 out_free:
224         OBD_FREE_PTR(env);
225         return rc;
226 }
227
228 /*
229  * Update LVB associated with the global quota index.
230  * This function is called from the DLM itself after a glimpse callback, in this
231  * case valid ptlrpc request is passed.
232  */
233 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
234                     struct ptlrpc_request *req, int increase_only)
235 {
236         struct lu_env           *env;
237         struct qmt_thread_info  *qti;
238         struct qmt_device       *qmt = lu2qmt_dev(ld);
239         struct lquota_entry     *lqe;
240         struct lquota_lvb       *lvb;
241         struct ldlm_lock        *lock;
242         struct obd_export       *exp;
243         int                      rc = 0;
244         ENTRY;
245
246         LASSERT(res != NULL);
247
248         if (req == NULL)
249                 RETURN(0);
250
251         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
252                 /* no need to update lvb for global quota locks */
253                 RETURN(0);
254
255         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
256                                           lustre_swab_lquota_lvb);
257         if (lvb == NULL) {
258                 CERROR("%s: failed to extract lvb from request\n",
259                        qmt->qmt_svname);
260                 RETURN(-EFAULT);
261         }
262
263         lqe = res->lr_lvb_data;
264         LASSERT(lqe != NULL);
265         lqe_getref(lqe);
266
267         LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
268                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
269
270         if (lvb->lvb_id_rel == 0) {
271                 /* nothing to release */
272                 if (lvb->lvb_id_may_rel != 0)
273                         /* but might still release later ... */
274                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
275                 GOTO(out_lqe, rc = 0);
276         }
277
278         /* allocate environement */
279         OBD_ALLOC_PTR(env);
280         if (env == NULL)
281                 GOTO(out_lqe, rc = -ENOMEM);
282
283         /* initialize environment */
284         rc = lu_env_init(env, LCT_MD_THREAD);
285         if (rc)
286                 GOTO(out_env, rc);
287         qti = qmt_info(env);
288
289         /* The request is a glimpse callback which was sent via the
290          * reverse import to the slave. What we care about here is the
291          * export associated with the slave and req->rq_export is
292          * definitely not what we are looking for (it is actually set to
293          * NULL here).
294          * Therefore we extract the lock from the request argument
295          * and use lock->l_export. */
296         lock = ldlm_request_lock(req);
297         if (IS_ERR(lock)) {
298                 CERROR("%s: failed to get lock from request!\n",
299                        qmt->qmt_svname);
300                 GOTO(out_env_init, rc = PTR_ERR(lock));
301         }
302
303         exp = class_export_get(lock->l_export);
304         if (exp == NULL) {
305                 CERROR("%s: failed to get export from lock!\n",
306                        qmt->qmt_svname);
307                 GOTO(out_env_init, rc = -EFAULT);
308         }
309
310         /* release quota space */
311         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
312                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
313         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
314                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
315                              LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
316                              lvb->lvb_id_rel, rc);
317         class_export_put(exp);
318         if (rc)
319                 GOTO(out_env_init, rc);
320         EXIT;
321 out_env_init:
322         lu_env_fini(env);
323 out_env:
324         OBD_FREE_PTR(env);
325 out_lqe:
326         lqe_putref(lqe);
327         return rc;
328 }
329
330 /*
331  * Report size of lvb to ldlm layer in order to allocate lvb buffer
332  * As far as quota locks are concerned, the size is static and is the same
333  * for both global and per-ID locks which shares the same lvb format.
334  */
335 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
336 {
337         return sizeof(struct lquota_lvb);
338 }
339
340 /*
341  * Fill request buffer with quota lvb
342  */
343 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
344                   int lvblen)
345 {
346         struct ldlm_resource    *res = lock->l_resource;
347         struct lquota_lvb       *qlvb = lvb;
348         ENTRY;
349
350         LASSERT(res != NULL);
351
352         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
353             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
354                 RETURN(-EINVAL);
355
356         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
357                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
358                  * we are thus dealing with an ID lock. */
359                 struct lquota_entry     *lqe = res->lr_lvb_data;
360
361                 /* return current qunit value & edquot flags in lvb */
362                 lqe_getref(lqe);
363                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
364                 qlvb->lvb_flags = 0;
365                 if (lqe->lqe_edquot)
366                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
367                 lqe_putref(lqe);
368         } else {
369                 /* global quota lock */
370                 struct lu_env           *env;
371                 int                      rc;
372                 struct dt_object        *obj = res->lr_lvb_data;
373
374                 OBD_ALLOC_PTR(env);
375                 if (env == NULL)
376                         RETURN(-ENOMEM);
377
378                 /* initialize environment */
379                 rc = lu_env_init(env, LCT_LOCAL);
380                 if (rc) {
381                         OBD_FREE_PTR(env);
382                         RETURN(rc);
383                 }
384
385                 /* return current version of global index */
386                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
387
388                 lu_env_fini(env);
389                 OBD_FREE_PTR(env);
390         }
391
392         RETURN(sizeof(struct lquota_lvb));
393 }
394
395 /*
396  * Free lvb associated with a given ldlm resource
397  * we don't really allocate a lvb, lr_lvb_data just points to
398  * the appropriate backend structures.
399  */
400 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
401 {
402         ENTRY;
403
404         if (res->lr_lvb_data == NULL)
405                 RETURN(0);
406
407         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
408                 struct lquota_entry     *lqe = res->lr_lvb_data;
409
410                 /* release lqe reference */
411                 lqe_putref(lqe);
412         } else {
413                 struct dt_object        *obj = res->lr_lvb_data;
414                 struct lu_env           *env;
415                 int                      rc;
416
417                 OBD_ALLOC_PTR(env);
418                 if (env == NULL)
419                         RETURN(-ENOMEM);
420
421                 /* initialize environment */
422                 rc = lu_env_init(env, LCT_LOCAL);
423                 if (rc) {
424                         OBD_FREE_PTR(env);
425                         RETURN(rc);
426                 }
427
428                 /* release object reference */
429                 lu_object_put(env, &obj->do_lu);
430                 lu_env_fini(env);
431                 OBD_FREE_PTR(env);
432         }
433
434         res->lr_lvb_data = NULL;
435         res->lr_lvb_len  = 0;
436
437         RETURN(0);
438 }
439
440 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
441                                 struct obd_uuid *, union ldlm_gl_desc *,
442                                 void *);
443 /*
444  * Send glimpse callback to slaves holding a lock on resource \res.
445  * This is used to notify slaves of new quota settings or to claim quota space
446  * back.
447  *
448  * \param env  - is the environment passed by the caller
449  * \param qmt  - is the quota master target
450  * \param res  - is the dlm resource associated with the quota object
451  * \param desc - is the glimpse descriptor to pack in glimpse callback
452  * \param cb   - is the callback function called on every lock and determine
453  *               whether a glimpse should be issued
454  * \param arg  - is an opaq parameter passed to the callback function
455  */
456 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
457                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
458                             qmt_glimpse_cb_t cb, void *arg)
459 {
460         cfs_list_t      *tmp, *pos;
461         CFS_LIST_HEAD(gl_list);
462         int              rc = 0;
463         ENTRY;
464
465         lock_res(res);
466         /* scan list of granted locks */
467         cfs_list_for_each(pos, &res->lr_granted) {
468                 struct ldlm_glimpse_work        *work;
469                 struct ldlm_lock                *lock;
470                 struct obd_uuid                 *uuid;
471
472                 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
473                 LASSERT(lock->l_export);
474                 uuid = &lock->l_export->exp_client_uuid;
475
476                 if (cb != NULL) {
477                         rc = cb(env, qmt, uuid, desc, arg);
478                         if (rc == 0)
479                                 /* slave should not be notified */
480                                 continue;
481                         if (rc < 0)
482                                 /* something wrong happened, we still notify */
483                                 CERROR("%s: callback function failed to "
484                                        "determine whether slave %s should be "
485                                        "notified (%d)\n", qmt->qmt_svname,
486                                        obd_uuid2str(uuid), rc);
487                 }
488
489                 OBD_ALLOC_PTR(work);
490                 if (work == NULL) {
491                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
492                                obd_uuid2str(uuid));
493                         continue;
494                 }
495
496                 cfs_list_add_tail(&work->gl_list, &gl_list);
497                 work->gl_lock  = LDLM_LOCK_GET(lock);
498                 work->gl_flags = 0;
499                 work->gl_desc  = desc;
500
501         }
502         unlock_res(res);
503
504         if (cfs_list_empty(&gl_list)) {
505                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
506                 RETURN(0);
507         }
508
509         /* issue glimpse callbacks to all connected slaves */
510         rc = ldlm_glimpse_locks(res, &gl_list);
511
512         cfs_list_for_each_safe(pos, tmp, &gl_list) {
513                 struct ldlm_glimpse_work *work;
514
515                 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
516
517                 cfs_list_del(&work->gl_list);
518                 CERROR("%s: failed to notify %s of new quota settings\n",
519                        qmt->qmt_svname,
520                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
521                 LDLM_LOCK_RELEASE(work->gl_lock);
522                 OBD_FREE_PTR(work);
523         }
524
525         RETURN(rc);
526 }
527
528 /*
529  * Send glimpse request to all global quota locks to push new quota setting to
530  * slaves.
531  *
532  * \param env - is the environment passed by the caller
533  * \param lqe - is the lquota entry which has new settings
534  * \param ver - is the version associated with the setting change
535  */
536 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
537                          __u64 ver)
538 {
539         struct qmt_thread_info  *qti = qmt_info(env);
540         struct qmt_pool_info    *pool = lqe2qpi(lqe);
541         struct ldlm_resource    *res = NULL;
542         int                      rc;
543         ENTRY;
544
545         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
546                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
547
548         /* send glimpse callback to notify slaves of new quota settings */
549         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
550         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
551         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
552         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
553         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
554         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
555
556         /* look up ldlm resource associated with global index */
557         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
558         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
559                                 LDLM_PLAIN, 0);
560         if (res == NULL) {
561                 /* this might happen if no slaves have enqueued global quota
562                  * locks yet */
563                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
564                              "with "DFID, PFID(&qti->qti_fid));
565                 RETURN_EXIT;
566         }
567
568         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
569                               NULL, NULL);
570         ldlm_resource_putref(res);
571         EXIT;
572 }
573
574 /* Callback function used to select locks that should be glimpsed when
575  * broadcasting the new qunit value */
576 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
577                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
578                           void *arg)
579 {
580         struct obd_uuid *slv_uuid = arg;
581
582         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
583                 RETURN(0);
584         RETURN(+1);
585 }
586
587 /*
588  * Send glimpse request on per-ID lock to push new qunit value to slave.
589  *
590  * \param env  - is the environment passed by the caller
591  * \param qmt  - is the quota master target device
592  * \param lqe  - is the lquota entry with the new qunit value
593  * \param uuid - is the uuid of the slave acquiring space, if any
594  */
595 static void qmt_id_lock_glimpse(const struct lu_env *env,
596                                 struct qmt_device *qmt,
597                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
598 {
599         struct qmt_thread_info  *qti = qmt_info(env);
600         struct qmt_pool_info    *pool = lqe2qpi(lqe);
601         struct ldlm_resource    *res = NULL;
602         int                      rc;
603         ENTRY;
604
605         if (!lqe->lqe_enforced)
606                 RETURN_EXIT;
607
608         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
609                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
610         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
611         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
612                                 0);
613         if (res == NULL) {
614                 /* this might legitimately happens if slaves haven't had the
615                  * opportunity to enqueue quota lock yet. */
616                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
617                              "lock "DFID, PFID(&qti->qti_fid));
618                 lqe_write_lock(lqe);
619                 if (lqe->lqe_revoke_time == 0 &&
620                     lqe->lqe_qunit == pool->qpi_least_qunit)
621                         lqe->lqe_revoke_time = cfs_time_current_64();
622                 lqe_write_unlock(lqe);
623                 RETURN_EXIT;
624         }
625
626         lqe_write_lock(lqe);
627         /* The purpose of glimpse callback on per-ID lock is twofold:
628          * - notify slaves of new qunit value and hope they will release some
629          *   spare quota space in return
630          * - notify slaves that master ran out of quota space and there is no
631          *   need to send acquire request any more until further notice */
632
633         /* fill glimpse descriptor with lqe settings */
634         if (lqe->lqe_edquot)
635                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
636         else
637                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
638         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
639
640         if (lqe->lqe_revoke_time == 0 &&
641             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
642                 /* reset lqe_may_rel, it will be updated on glimpse callback
643                  * replies if needed */
644                 lqe->lqe_may_rel = 0;
645
646         /* The rebalance thread is the only thread which can issue glimpses */
647         LASSERT(!lqe->lqe_gl);
648         lqe->lqe_gl = true;
649         lqe_write_unlock(lqe);
650
651         /* issue glimpse callback to slaves */
652         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
653                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
654
655         lqe_write_lock(lqe);
656         if (lqe->lqe_revoke_time == 0 &&
657             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
658             lqe->lqe_qunit == pool->qpi_least_qunit) {
659                 lqe->lqe_revoke_time = cfs_time_current_64();
660                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
661         }
662         LASSERT(lqe->lqe_gl);
663         lqe->lqe_gl = false;
664         lqe_write_unlock(lqe);
665
666         ldlm_resource_putref(res);
667         EXIT;
668 }
669
670 /*
671  * Schedule a glimpse request on per-ID locks to push new qunit value or
672  * edquot flag to quota slaves.
673  *
674  * \param qmt  - is the quota master target device
675  * \param lqe  - is the lquota entry with the new qunit value
676  */
677 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
678 {
679         bool    added = false;
680         ENTRY;
681
682         lqe_getref(lqe);
683         spin_lock(&qmt->qmt_reba_lock);
684         if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
685                 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
686                 added = true;
687         }
688         spin_unlock(&qmt->qmt_reba_lock);
689
690         if (added)
691                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
692         else
693                 lqe_putref(lqe);
694         EXIT;
695 }
696
697 /*
698  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
699  * quota locks owned by slaves in order to notify them of:
700  * - a qunit shrink in which case slaves might release quota space back in
701  *   glimpse reply.
702  * - set/clear edquot flag used to cache the "quota exhausted" state of the
703  *   master. When the flag is set, slaves know that there is no need to
704  *   try to acquire quota from the master since this latter has already
705  *   distributed all the space.
706  */
707 static int qmt_reba_thread(void *arg)
708 {
709         struct qmt_device       *qmt = (struct qmt_device *)arg;
710         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
711         struct l_wait_info       lwi = { 0 };
712         struct lu_env           *env;
713         struct lquota_entry     *lqe, *tmp;
714         int                      rc;
715         ENTRY;
716
717         OBD_ALLOC_PTR(env);
718         if (env == NULL)
719                 RETURN(-ENOMEM);
720
721         rc = lu_env_init(env, LCT_MD_THREAD);
722         if (rc) {
723                 CERROR("%s: failed to init env.", qmt->qmt_svname);
724                 OBD_FREE_PTR(env);
725                 RETURN(rc);
726         }
727
728         thread_set_flags(thread, SVC_RUNNING);
729         wake_up(&thread->t_ctl_waitq);
730
731         while (1) {
732                 l_wait_event(thread->t_ctl_waitq,
733                              !cfs_list_empty(&qmt->qmt_reba_list) ||
734                              !thread_is_running(thread), &lwi);
735
736                 spin_lock(&qmt->qmt_reba_lock);
737                 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
738                                              lqe_link) {
739                         cfs_list_del_init(&lqe->lqe_link);
740                         spin_unlock(&qmt->qmt_reba_lock);
741
742                         if (thread_is_running(thread))
743                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
744
745                         lqe_putref(lqe);
746                         spin_lock(&qmt->qmt_reba_lock);
747                 }
748                 spin_unlock(&qmt->qmt_reba_lock);
749
750                 if (!thread_is_running(thread))
751                         break;
752         }
753         lu_env_fini(env);
754         OBD_FREE_PTR(env);
755         thread_set_flags(thread, SVC_STOPPED);
756         wake_up(&thread->t_ctl_waitq);
757         RETURN(rc);
758 }
759
760 /*
761  * Start rebalance thread. Called when the QMT is being setup
762  */
763 int qmt_start_reba_thread(struct qmt_device *qmt)
764 {
765         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
766         struct l_wait_info       lwi    = { 0 };
767         struct task_struct              *task;
768         ENTRY;
769
770         task = kthread_run(qmt_reba_thread, (void *)qmt,
771                                "qmt_reba_%s", qmt->qmt_svname);
772         if (IS_ERR(task)) {
773                 CERROR("%s: failed to start rebalance thread (%ld)\n",
774                        qmt->qmt_svname, PTR_ERR(task));
775                 thread_set_flags(thread, SVC_STOPPED);
776                 RETURN(PTR_ERR(task));
777         }
778
779         l_wait_event(thread->t_ctl_waitq,
780                      thread_is_running(thread) || thread_is_stopped(thread),
781                      &lwi);
782
783         RETURN(0);
784 }
785
786 /*
787  * Stop rebalance thread. Called when the QMT is about to shutdown.
788  */
789 void qmt_stop_reba_thread(struct qmt_device *qmt)
790 {
791         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
792
793         if (!thread_is_stopped(thread)) {
794                 struct l_wait_info lwi = { 0 };
795
796                 thread_set_flags(thread, SVC_STOPPING);
797                 wake_up(&thread->t_ctl_waitq);
798
799                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
800                              &lwi);
801         }
802         LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
803 }