Whamcloud - gitweb
LU-7931 tests: setup/cleanup after every test script
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2014, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         int                      rc, lvb_len;
55         ENTRY;
56
57         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
58         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
59                              ldlm_lvbo_size(*lockp));
60
61         /* extract quota body and intent opc */
62         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
63         if (it == NULL)
64                 RETURN(err_serious(-EFAULT));
65
66         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
67         if (reqbody == NULL)
68                 RETURN(err_serious(-EFAULT));
69
70         /* prepare reply */
71         rc = req_capsule_server_pack(&req->rq_pill);
72         if (rc != 0) {
73                 CERROR("Can't pack response, rc %d\n", rc);
74                 RETURN(err_serious(rc));
75         }
76
77         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
78         if (repbody == NULL)
79                 RETURN(err_serious(-EFAULT));
80
81         uuid = &(*lockp)->l_export->exp_client_uuid;
82         switch (it->opc) {
83
84         case IT_QUOTA_DQACQ: {
85                 struct lquota_entry     *lqe;
86                 struct ldlm_lock        *lock;
87
88                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
89                         /* acquire on global lock? something is wrong ... */
90                         GOTO(out, rc = -EPROTO);
91
92                 /* verify global lock isn't stale */
93                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
94                         GOTO(out, rc = -ENOLCK);
95
96                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
97                 if (lock == NULL)
98                         GOTO(out, rc = -ENOLCK);
99                 LDLM_LOCK_PUT(lock);
100
101                 lqe = res->lr_lvb_data;
102                 LASSERT(lqe != NULL);
103                 lqe_getref(lqe);
104
105                 /* acquire quota space */
106                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
107                                 reqbody->qb_count, reqbody->qb_usage,
108                                 repbody);
109                 lqe_putref(lqe);
110                 if (rc)
111                         GOTO(out, rc);
112                 break;
113         }
114
115         case IT_QUOTA_CONN:
116                 /* new connection from slave */
117
118                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
119                         /* connection on per-ID lock? something is wrong ... */
120                         GOTO(out, rc = -EPROTO);
121
122                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
123                                        &repbody->qb_slv_fid,
124                                        &repbody->qb_slv_ver, uuid);
125                 if (rc)
126                         GOTO(out, rc);
127                 break;
128
129         default:
130                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
131                        it->opc);
132                 GOTO(out, rc = err_serious(-EINVAL));
133         }
134
135         /* on success, pack lvb in reply */
136         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
137         lvb_len = ldlm_lvbo_size(*lockp);
138         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
139         if (lvb_len < 0)
140                 GOTO(out, rc = lvb_len);
141
142         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
143         EXIT;
144 out:
145         return rc;
146 }
147
148 /*
149  * Initialize quota LVB associated with quota indexes.
150  * Called with res->lr_lvb_sem held
151  */
152 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
153 {
154         struct lu_env           *env;
155         struct qmt_thread_info  *qti;
156         struct qmt_device       *qmt = lu2qmt_dev(ld);
157         int                      pool_id, pool_type, qtype;
158         int                      rc;
159         ENTRY;
160
161         LASSERT(res != NULL);
162
163         if (res->lr_type != LDLM_PLAIN)
164                 RETURN(-ENOTSUPP);
165
166         if (res->lr_lvb_data ||
167             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
168                 RETURN(0);
169
170         OBD_ALLOC_PTR(env);
171         if (env == NULL)
172                 RETURN(-ENOMEM);
173
174         /* initialize environment */
175         rc = lu_env_init(env, LCT_MD_THREAD);
176         if (rc != 0)
177                 GOTO(out_free, rc);
178         qti = qmt_info(env);
179
180         /* extract global index FID and quota identifier */
181         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
182
183         /* sanity check the global index FID */
184         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
185         if (rc) {
186                 CERROR("can't extract pool information from FID "DFID"\n",
187                        PFID(&qti->qti_fid));
188                 GOTO(out, rc);
189         }
190
191         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
192                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
193                  * we are thus dealing with an ID lock. */
194                 struct lquota_entry     *lqe;
195
196                 /* Find the quota entry associated with the quota id */
197                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
198                                           &qti->qti_id);
199                 if (IS_ERR(lqe))
200                         GOTO(out, rc = PTR_ERR(lqe));
201
202                 /* store reference to lqe in lr_lvb_data */
203                 res->lr_lvb_data = lqe;
204                 LQUOTA_DEBUG(lqe, "initialized res lvb");
205         } else {
206                 struct dt_object        *obj;
207
208                 /* lookup global index */
209                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
210                 if (IS_ERR(obj))
211                         GOTO(out, rc = PTR_ERR(obj));
212                 if (!dt_object_exists(obj)) {
213                         lu_object_put(env, &obj->do_lu);
214                         GOTO(out, rc = -ENOENT);
215                 }
216
217                 /* store reference to global index object in lr_lvb_data */
218                 res->lr_lvb_data = obj;
219                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
220         }
221
222         res->lr_lvb_len = sizeof(struct lquota_lvb);
223         EXIT;
224 out:
225         lu_env_fini(env);
226 out_free:
227         OBD_FREE_PTR(env);
228         return rc;
229 }
230
231 /*
232  * Update LVB associated with the global quota index.
233  * This function is called from the DLM itself after a glimpse callback, in this
234  * case valid ptlrpc request is passed.
235  */
236 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
237                     struct ptlrpc_request *req, int increase_only)
238 {
239         struct lu_env           *env;
240         struct qmt_thread_info  *qti;
241         struct qmt_device       *qmt = lu2qmt_dev(ld);
242         struct lquota_entry     *lqe;
243         struct lquota_lvb       *lvb;
244         struct ldlm_lock        *lock;
245         struct obd_export       *exp;
246         int                      rc = 0;
247         ENTRY;
248
249         LASSERT(res != NULL);
250
251         if (req == NULL)
252                 RETURN(0);
253
254         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
255                 /* no need to update lvb for global quota locks */
256                 RETURN(0);
257
258         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
259                                           lustre_swab_lquota_lvb);
260         if (lvb == NULL) {
261                 CERROR("%s: failed to extract lvb from request\n",
262                        qmt->qmt_svname);
263                 RETURN(-EFAULT);
264         }
265
266         lqe = res->lr_lvb_data;
267         LASSERT(lqe != NULL);
268         lqe_getref(lqe);
269
270         LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
271                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
272
273         if (lvb->lvb_id_rel == 0) {
274                 /* nothing to release */
275                 if (lvb->lvb_id_may_rel != 0)
276                         /* but might still release later ... */
277                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
278                 GOTO(out_lqe, rc = 0);
279         }
280
281         /* allocate environement */
282         OBD_ALLOC_PTR(env);
283         if (env == NULL)
284                 GOTO(out_lqe, rc = -ENOMEM);
285
286         /* initialize environment */
287         rc = lu_env_init(env, LCT_MD_THREAD);
288         if (rc)
289                 GOTO(out_env, rc);
290         qti = qmt_info(env);
291
292         /* The request is a glimpse callback which was sent via the
293          * reverse import to the slave. What we care about here is the
294          * export associated with the slave and req->rq_export is
295          * definitely not what we are looking for (it is actually set to
296          * NULL here).
297          * Therefore we extract the lock from the request argument
298          * and use lock->l_export. */
299         lock = ldlm_request_lock(req);
300         if (IS_ERR(lock)) {
301                 CERROR("%s: failed to get lock from request!\n",
302                        qmt->qmt_svname);
303                 GOTO(out_env_init, rc = PTR_ERR(lock));
304         }
305
306         exp = class_export_get(lock->l_export);
307         if (exp == NULL) {
308                 CERROR("%s: failed to get export from lock!\n",
309                        qmt->qmt_svname);
310                 GOTO(out_env_init, rc = -EFAULT);
311         }
312
313         /* release quota space */
314         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
315                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
316         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
317                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
318                              LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
319                              lvb->lvb_id_rel, rc);
320         class_export_put(exp);
321         if (rc)
322                 GOTO(out_env_init, rc);
323         EXIT;
324 out_env_init:
325         lu_env_fini(env);
326 out_env:
327         OBD_FREE_PTR(env);
328 out_lqe:
329         lqe_putref(lqe);
330         return rc;
331 }
332
333 /*
334  * Report size of lvb to ldlm layer in order to allocate lvb buffer
335  * As far as quota locks are concerned, the size is static and is the same
336  * for both global and per-ID locks which shares the same lvb format.
337  */
338 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
339 {
340         return sizeof(struct lquota_lvb);
341 }
342
343 /*
344  * Fill request buffer with quota lvb
345  */
346 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
347                   int lvblen)
348 {
349         struct ldlm_resource    *res = lock->l_resource;
350         struct lquota_lvb       *qlvb = lvb;
351         ENTRY;
352
353         LASSERT(res != NULL);
354
355         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
356             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
357                 RETURN(-EINVAL);
358
359         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
360                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
361                  * we are thus dealing with an ID lock. */
362                 struct lquota_entry     *lqe = res->lr_lvb_data;
363
364                 /* return current qunit value & edquot flags in lvb */
365                 lqe_getref(lqe);
366                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
367                 qlvb->lvb_flags = 0;
368                 if (lqe->lqe_edquot)
369                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
370                 lqe_putref(lqe);
371         } else {
372                 /* global quota lock */
373                 struct lu_env           *env;
374                 int                      rc;
375                 struct dt_object        *obj = res->lr_lvb_data;
376
377                 OBD_ALLOC_PTR(env);
378                 if (env == NULL)
379                         RETURN(-ENOMEM);
380
381                 /* initialize environment */
382                 rc = lu_env_init(env, LCT_LOCAL);
383                 if (rc) {
384                         OBD_FREE_PTR(env);
385                         RETURN(rc);
386                 }
387
388                 /* return current version of global index */
389                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
390
391                 lu_env_fini(env);
392                 OBD_FREE_PTR(env);
393         }
394
395         RETURN(sizeof(struct lquota_lvb));
396 }
397
398 /*
399  * Free lvb associated with a given ldlm resource
400  * we don't really allocate a lvb, lr_lvb_data just points to
401  * the appropriate backend structures.
402  */
403 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
404 {
405         ENTRY;
406
407         if (res->lr_lvb_data == NULL)
408                 RETURN(0);
409
410         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
411                 struct lquota_entry     *lqe = res->lr_lvb_data;
412
413                 /* release lqe reference */
414                 lqe_putref(lqe);
415         } else {
416                 struct dt_object        *obj = res->lr_lvb_data;
417                 struct lu_env           *env;
418                 int                      rc;
419
420                 OBD_ALLOC_PTR(env);
421                 if (env == NULL)
422                         RETURN(-ENOMEM);
423
424                 /* initialize environment */
425                 rc = lu_env_init(env, LCT_LOCAL);
426                 if (rc) {
427                         OBD_FREE_PTR(env);
428                         RETURN(rc);
429                 }
430
431                 /* release object reference */
432                 lu_object_put(env, &obj->do_lu);
433                 lu_env_fini(env);
434                 OBD_FREE_PTR(env);
435         }
436
437         res->lr_lvb_data = NULL;
438         res->lr_lvb_len  = 0;
439
440         RETURN(0);
441 }
442
443 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
444                                 struct obd_uuid *, union ldlm_gl_desc *,
445                                 void *);
446 /*
447  * Send glimpse callback to slaves holding a lock on resource \res.
448  * This is used to notify slaves of new quota settings or to claim quota space
449  * back.
450  *
451  * \param env  - is the environment passed by the caller
452  * \param qmt  - is the quota master target
453  * \param res  - is the dlm resource associated with the quota object
454  * \param desc - is the glimpse descriptor to pack in glimpse callback
455  * \param cb   - is the callback function called on every lock and determine
456  *               whether a glimpse should be issued
457  * \param arg  - is an opaq parameter passed to the callback function
458  */
459 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
460                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
461                             qmt_glimpse_cb_t cb, void *arg)
462 {
463         struct list_head *tmp, *pos;
464         struct list_head gl_list = LIST_HEAD_INIT(gl_list);
465         int              rc = 0;
466         ENTRY;
467
468         lock_res(res);
469         /* scan list of granted locks */
470         list_for_each(pos, &res->lr_granted) {
471                 struct ldlm_glimpse_work        *work;
472                 struct ldlm_lock                *lock;
473                 struct obd_uuid                 *uuid;
474
475                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
476                 LASSERT(lock->l_export);
477                 uuid = &lock->l_export->exp_client_uuid;
478
479                 if (cb != NULL) {
480                         rc = cb(env, qmt, uuid, desc, arg);
481                         if (rc == 0)
482                                 /* slave should not be notified */
483                                 continue;
484                         if (rc < 0)
485                                 /* something wrong happened, we still notify */
486                                 CERROR("%s: callback function failed to "
487                                        "determine whether slave %s should be "
488                                        "notified (%d)\n", qmt->qmt_svname,
489                                        obd_uuid2str(uuid), rc);
490                 }
491
492                 OBD_ALLOC_PTR(work);
493                 if (work == NULL) {
494                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
495                                obd_uuid2str(uuid));
496                         continue;
497                 }
498
499                 list_add_tail(&work->gl_list, &gl_list);
500                 work->gl_lock  = LDLM_LOCK_GET(lock);
501                 work->gl_flags = 0;
502                 work->gl_desc  = desc;
503
504         }
505         unlock_res(res);
506
507         if (list_empty(&gl_list)) {
508                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
509                 RETURN(0);
510         }
511
512         /* issue glimpse callbacks to all connected slaves */
513         rc = ldlm_glimpse_locks(res, &gl_list);
514
515         list_for_each_safe(pos, tmp, &gl_list) {
516                 struct ldlm_glimpse_work *work;
517
518                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
519
520                 list_del(&work->gl_list);
521                 CERROR("%s: failed to notify %s of new quota settings\n",
522                        qmt->qmt_svname,
523                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
524                 LDLM_LOCK_RELEASE(work->gl_lock);
525                 OBD_FREE_PTR(work);
526         }
527
528         RETURN(rc);
529 }
530
531 /*
532  * Send glimpse request to all global quota locks to push new quota setting to
533  * slaves.
534  *
535  * \param env - is the environment passed by the caller
536  * \param lqe - is the lquota entry which has new settings
537  * \param ver - is the version associated with the setting change
538  */
539 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
540                          __u64 ver)
541 {
542         struct qmt_thread_info  *qti = qmt_info(env);
543         struct qmt_pool_info    *pool = lqe2qpi(lqe);
544         struct ldlm_resource    *res = NULL;
545         int                      rc;
546         ENTRY;
547
548         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
549                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
550
551         /* send glimpse callback to notify slaves of new quota settings */
552         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
553         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
554         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
555         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
556         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
557         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
558
559         /* look up ldlm resource associated with global index */
560         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
561         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
562                                 LDLM_PLAIN, 0);
563         if (IS_ERR(res)) {
564                 /* this might happen if no slaves have enqueued global quota
565                  * locks yet */
566                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
567                              "with "DFID, PFID(&qti->qti_fid));
568                 RETURN_EXIT;
569         }
570
571         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
572                               NULL, NULL);
573         ldlm_resource_putref(res);
574         EXIT;
575 }
576
577 /* Callback function used to select locks that should be glimpsed when
578  * broadcasting the new qunit value */
579 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
580                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
581                           void *arg)
582 {
583         struct obd_uuid *slv_uuid = arg;
584
585         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
586                 RETURN(0);
587         RETURN(+1);
588 }
589
590 /*
591  * Send glimpse request on per-ID lock to push new qunit value to slave.
592  *
593  * \param env  - is the environment passed by the caller
594  * \param qmt  - is the quota master target device
595  * \param lqe  - is the lquota entry with the new qunit value
596  * \param uuid - is the uuid of the slave acquiring space, if any
597  */
598 static void qmt_id_lock_glimpse(const struct lu_env *env,
599                                 struct qmt_device *qmt,
600                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
601 {
602         struct qmt_thread_info  *qti = qmt_info(env);
603         struct qmt_pool_info    *pool = lqe2qpi(lqe);
604         struct ldlm_resource    *res = NULL;
605         int                      rc;
606         ENTRY;
607
608         if (!lqe->lqe_enforced)
609                 RETURN_EXIT;
610
611         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
612                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
613         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
614         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
615                                 0);
616         if (IS_ERR(res)) {
617                 /* this might legitimately happens if slaves haven't had the
618                  * opportunity to enqueue quota lock yet. */
619                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
620                              "lock "DFID, PFID(&qti->qti_fid));
621                 lqe_write_lock(lqe);
622                 if (lqe->lqe_revoke_time == 0 &&
623                     lqe->lqe_qunit == pool->qpi_least_qunit)
624                         lqe->lqe_revoke_time = cfs_time_current_64();
625                 lqe_write_unlock(lqe);
626                 RETURN_EXIT;
627         }
628
629         lqe_write_lock(lqe);
630         /* The purpose of glimpse callback on per-ID lock is twofold:
631          * - notify slaves of new qunit value and hope they will release some
632          *   spare quota space in return
633          * - notify slaves that master ran out of quota space and there is no
634          *   need to send acquire request any more until further notice */
635
636         /* fill glimpse descriptor with lqe settings */
637         if (lqe->lqe_edquot)
638                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
639         else
640                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
641         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
642
643         if (lqe->lqe_revoke_time == 0 &&
644             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
645                 /* reset lqe_may_rel, it will be updated on glimpse callback
646                  * replies if needed */
647                 lqe->lqe_may_rel = 0;
648
649         /* The rebalance thread is the only thread which can issue glimpses */
650         LASSERT(!lqe->lqe_gl);
651         lqe->lqe_gl = true;
652         lqe_write_unlock(lqe);
653
654         /* issue glimpse callback to slaves */
655         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
656                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
657
658         lqe_write_lock(lqe);
659         if (lqe->lqe_revoke_time == 0 &&
660             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
661             lqe->lqe_qunit == pool->qpi_least_qunit) {
662                 lqe->lqe_revoke_time = cfs_time_current_64();
663                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
664         }
665         LASSERT(lqe->lqe_gl);
666         lqe->lqe_gl = false;
667         lqe_write_unlock(lqe);
668
669         ldlm_resource_putref(res);
670         EXIT;
671 }
672
673 /*
674  * Schedule a glimpse request on per-ID locks to push new qunit value or
675  * edquot flag to quota slaves.
676  *
677  * \param qmt  - is the quota master target device
678  * \param lqe  - is the lquota entry with the new qunit value
679  */
680 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
681 {
682         bool    added = false;
683         ENTRY;
684
685         lqe_getref(lqe);
686         spin_lock(&qmt->qmt_reba_lock);
687         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
688                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
689                 added = true;
690         }
691         spin_unlock(&qmt->qmt_reba_lock);
692
693         if (added)
694                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
695         else
696                 lqe_putref(lqe);
697         EXIT;
698 }
699
700 /*
701  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
702  * quota locks owned by slaves in order to notify them of:
703  * - a qunit shrink in which case slaves might release quota space back in
704  *   glimpse reply.
705  * - set/clear edquot flag used to cache the "quota exhausted" state of the
706  *   master. When the flag is set, slaves know that there is no need to
707  *   try to acquire quota from the master since this latter has already
708  *   distributed all the space.
709  */
710 static int qmt_reba_thread(void *arg)
711 {
712         struct qmt_device       *qmt = (struct qmt_device *)arg;
713         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
714         struct l_wait_info       lwi = { 0 };
715         struct lu_env           *env;
716         struct lquota_entry     *lqe, *tmp;
717         int                      rc;
718         ENTRY;
719
720         OBD_ALLOC_PTR(env);
721         if (env == NULL)
722                 RETURN(-ENOMEM);
723
724         rc = lu_env_init(env, LCT_MD_THREAD);
725         if (rc) {
726                 CERROR("%s: failed to init env.", qmt->qmt_svname);
727                 OBD_FREE_PTR(env);
728                 RETURN(rc);
729         }
730
731         thread_set_flags(thread, SVC_RUNNING);
732         wake_up(&thread->t_ctl_waitq);
733
734         while (1) {
735                 l_wait_event(thread->t_ctl_waitq,
736                              !list_empty(&qmt->qmt_reba_list) ||
737                              !thread_is_running(thread), &lwi);
738
739                 spin_lock(&qmt->qmt_reba_lock);
740                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
741                                          lqe_link) {
742                         list_del_init(&lqe->lqe_link);
743                         spin_unlock(&qmt->qmt_reba_lock);
744
745                         if (thread_is_running(thread))
746                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
747
748                         lqe_putref(lqe);
749                         spin_lock(&qmt->qmt_reba_lock);
750                 }
751                 spin_unlock(&qmt->qmt_reba_lock);
752
753                 if (!thread_is_running(thread))
754                         break;
755         }
756         lu_env_fini(env);
757         OBD_FREE_PTR(env);
758         thread_set_flags(thread, SVC_STOPPED);
759         wake_up(&thread->t_ctl_waitq);
760         RETURN(rc);
761 }
762
763 /*
764  * Start rebalance thread. Called when the QMT is being setup
765  */
766 int qmt_start_reba_thread(struct qmt_device *qmt)
767 {
768         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
769         struct l_wait_info       lwi    = { 0 };
770         struct task_struct              *task;
771         ENTRY;
772
773         task = kthread_run(qmt_reba_thread, (void *)qmt,
774                                "qmt_reba_%s", qmt->qmt_svname);
775         if (IS_ERR(task)) {
776                 CERROR("%s: failed to start rebalance thread (%ld)\n",
777                        qmt->qmt_svname, PTR_ERR(task));
778                 thread_set_flags(thread, SVC_STOPPED);
779                 RETURN(PTR_ERR(task));
780         }
781
782         l_wait_event(thread->t_ctl_waitq,
783                      thread_is_running(thread) || thread_is_stopped(thread),
784                      &lwi);
785
786         RETURN(0);
787 }
788
789 /*
790  * Stop rebalance thread. Called when the QMT is about to shutdown.
791  */
792 void qmt_stop_reba_thread(struct qmt_device *qmt)
793 {
794         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
795
796         if (!thread_is_stopped(thread)) {
797                 struct l_wait_info lwi = { 0 };
798
799                 thread_set_flags(thread, SVC_STOPPING);
800                 wake_up(&thread->t_ctl_waitq);
801
802                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
803                              &lwi);
804         }
805         LASSERT(list_empty(&qmt->qmt_reba_list));
806 }