Whamcloud - gitweb
LU-8634 quota: fix return code of intent quota lock
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2014, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         struct ldlm_reply       *ldlm_rep;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
83         if (ldlm_rep == NULL)
84                 RETURN(err_serious(-EFAULT));
85
86         uuid = &(*lockp)->l_export->exp_client_uuid;
87         switch (it->opc) {
88
89         case IT_QUOTA_DQACQ: {
90                 struct lquota_entry     *lqe;
91                 struct ldlm_lock        *lock;
92
93                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
94                         /* acquire on global lock? something is wrong ... */
95                         GOTO(out, rc = -EPROTO);
96
97                 /* verify global lock isn't stale */
98                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
99                         GOTO(out, rc = -ENOLCK);
100
101                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
102                 if (lock == NULL)
103                         GOTO(out, rc = -ENOLCK);
104                 LDLM_LOCK_PUT(lock);
105
106                 lqe = res->lr_lvb_data;
107                 LASSERT(lqe != NULL);
108                 lqe_getref(lqe);
109
110                 /* acquire quota space */
111                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
112                                 reqbody->qb_count, reqbody->qb_usage,
113                                 repbody);
114                 lqe_putref(lqe);
115                 if (rc)
116                         GOTO(out, rc);
117                 break;
118         }
119
120         case IT_QUOTA_CONN:
121                 /* new connection from slave */
122
123                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
124                         /* connection on per-ID lock? something is wrong ... */
125                         GOTO(out, rc = -EPROTO);
126
127                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
128                                        &repbody->qb_slv_fid,
129                                        &repbody->qb_slv_ver, uuid);
130                 if (rc)
131                         GOTO(out, rc);
132                 break;
133
134         default:
135                 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
136                        it->opc);
137                 GOTO(out, rc = -EINVAL);
138         }
139
140         /* on success, pack lvb in reply */
141         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142         lvb_len = ldlm_lvbo_size(*lockp);
143         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
144         if (lvb_len < 0)
145                 GOTO(out, rc = lvb_len);
146
147         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
148 out:
149         ldlm_rep->lock_policy_res2 = clear_serious(rc);
150         EXIT;
151         return ELDLM_OK;
152 }
153
154 /*
155  * Initialize quota LVB associated with quota indexes.
156  * Called with res->lr_lvb_sem held
157  */
158 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
159 {
160         struct lu_env           *env;
161         struct qmt_thread_info  *qti;
162         struct qmt_device       *qmt = lu2qmt_dev(ld);
163         int                      pool_id, pool_type, qtype;
164         int                      rc;
165         ENTRY;
166
167         LASSERT(res != NULL);
168
169         if (res->lr_type != LDLM_PLAIN)
170                 RETURN(-ENOTSUPP);
171
172         if (res->lr_lvb_data ||
173             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
174                 RETURN(0);
175
176         OBD_ALLOC_PTR(env);
177         if (env == NULL)
178                 RETURN(-ENOMEM);
179
180         /* initialize environment */
181         rc = lu_env_init(env, LCT_MD_THREAD);
182         if (rc != 0)
183                 GOTO(out_free, rc);
184         qti = qmt_info(env);
185
186         /* extract global index FID and quota identifier */
187         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
188
189         /* sanity check the global index FID */
190         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
191         if (rc) {
192                 CERROR("can't extract pool information from FID "DFID"\n",
193                        PFID(&qti->qti_fid));
194                 GOTO(out, rc);
195         }
196
197         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
198                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
199                  * we are thus dealing with an ID lock. */
200                 struct lquota_entry     *lqe;
201
202                 /* Find the quota entry associated with the quota id */
203                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
204                                           &qti->qti_id);
205                 if (IS_ERR(lqe))
206                         GOTO(out, rc = PTR_ERR(lqe));
207
208                 /* store reference to lqe in lr_lvb_data */
209                 res->lr_lvb_data = lqe;
210                 LQUOTA_DEBUG(lqe, "initialized res lvb");
211         } else {
212                 struct dt_object        *obj;
213
214                 /* lookup global index */
215                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
216                 if (IS_ERR(obj))
217                         GOTO(out, rc = PTR_ERR(obj));
218                 if (!dt_object_exists(obj)) {
219                         lu_object_put(env, &obj->do_lu);
220                         GOTO(out, rc = -ENOENT);
221                 }
222
223                 /* store reference to global index object in lr_lvb_data */
224                 res->lr_lvb_data = obj;
225                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
226         }
227
228         res->lr_lvb_len = sizeof(struct lquota_lvb);
229         EXIT;
230 out:
231         lu_env_fini(env);
232 out_free:
233         OBD_FREE_PTR(env);
234         return rc;
235 }
236
237 /*
238  * Update LVB associated with the global quota index.
239  * This function is called from the DLM itself after a glimpse callback, in this
240  * case valid ptlrpc request is passed.
241  */
242 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
243                     struct ptlrpc_request *req, int increase_only)
244 {
245         struct lu_env           *env;
246         struct qmt_thread_info  *qti;
247         struct qmt_device       *qmt = lu2qmt_dev(ld);
248         struct lquota_entry     *lqe;
249         struct lquota_lvb       *lvb;
250         struct ldlm_lock        *lock;
251         struct obd_export       *exp;
252         int                      rc = 0;
253         ENTRY;
254
255         LASSERT(res != NULL);
256
257         if (req == NULL)
258                 RETURN(0);
259
260         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
261                 /* no need to update lvb for global quota locks */
262                 RETURN(0);
263
264         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
265                                           lustre_swab_lquota_lvb);
266         if (lvb == NULL) {
267                 CERROR("%s: failed to extract lvb from request\n",
268                        qmt->qmt_svname);
269                 RETURN(-EFAULT);
270         }
271
272         lqe = res->lr_lvb_data;
273         LASSERT(lqe != NULL);
274         lqe_getref(lqe);
275
276         LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
277                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
278
279         if (lvb->lvb_id_rel == 0) {
280                 /* nothing to release */
281                 if (lvb->lvb_id_may_rel != 0)
282                         /* but might still release later ... */
283                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
284                 GOTO(out_lqe, rc = 0);
285         }
286
287         /* allocate environement */
288         OBD_ALLOC_PTR(env);
289         if (env == NULL)
290                 GOTO(out_lqe, rc = -ENOMEM);
291
292         /* initialize environment */
293         rc = lu_env_init(env, LCT_MD_THREAD);
294         if (rc)
295                 GOTO(out_env, rc);
296         qti = qmt_info(env);
297
298         /* The request is a glimpse callback which was sent via the
299          * reverse import to the slave. What we care about here is the
300          * export associated with the slave and req->rq_export is
301          * definitely not what we are looking for (it is actually set to
302          * NULL here).
303          * Therefore we extract the lock from the request argument
304          * and use lock->l_export. */
305         lock = ldlm_request_lock(req);
306         if (IS_ERR(lock)) {
307                 CERROR("%s: failed to get lock from request!\n",
308                        qmt->qmt_svname);
309                 GOTO(out_env_init, rc = PTR_ERR(lock));
310         }
311
312         exp = class_export_get(lock->l_export);
313         if (exp == NULL) {
314                 CERROR("%s: failed to get export from lock!\n",
315                        qmt->qmt_svname);
316                 GOTO(out_env_init, rc = -EFAULT);
317         }
318
319         /* release quota space */
320         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
321                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
322         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
323                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
324                              "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
325                              lvb->lvb_id_rel, rc);
326         class_export_put(exp);
327         if (rc)
328                 GOTO(out_env_init, rc);
329         EXIT;
330 out_env_init:
331         lu_env_fini(env);
332 out_env:
333         OBD_FREE_PTR(env);
334 out_lqe:
335         lqe_putref(lqe);
336         return rc;
337 }
338
339 /*
340  * Report size of lvb to ldlm layer in order to allocate lvb buffer
341  * As far as quota locks are concerned, the size is static and is the same
342  * for both global and per-ID locks which shares the same lvb format.
343  */
344 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
345 {
346         return sizeof(struct lquota_lvb);
347 }
348
349 /*
350  * Fill request buffer with quota lvb
351  */
352 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
353                   int lvblen)
354 {
355         struct ldlm_resource    *res = lock->l_resource;
356         struct lquota_lvb       *qlvb = lvb;
357         ENTRY;
358
359         LASSERT(res != NULL);
360
361         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
362             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
363                 RETURN(-EINVAL);
364
365         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
366                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
367                  * we are thus dealing with an ID lock. */
368                 struct lquota_entry     *lqe = res->lr_lvb_data;
369
370                 /* return current qunit value & edquot flags in lvb */
371                 lqe_getref(lqe);
372                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
373                 qlvb->lvb_flags = 0;
374                 if (lqe->lqe_edquot)
375                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
376                 lqe_putref(lqe);
377         } else {
378                 /* global quota lock */
379                 struct lu_env           *env;
380                 int                      rc;
381                 struct dt_object        *obj = res->lr_lvb_data;
382
383                 OBD_ALLOC_PTR(env);
384                 if (env == NULL)
385                         RETURN(-ENOMEM);
386
387                 /* initialize environment */
388                 rc = lu_env_init(env, LCT_LOCAL);
389                 if (rc) {
390                         OBD_FREE_PTR(env);
391                         RETURN(rc);
392                 }
393
394                 /* return current version of global index */
395                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
396
397                 lu_env_fini(env);
398                 OBD_FREE_PTR(env);
399         }
400
401         RETURN(sizeof(struct lquota_lvb));
402 }
403
404 /*
405  * Free lvb associated with a given ldlm resource
406  * we don't really allocate a lvb, lr_lvb_data just points to
407  * the appropriate backend structures.
408  */
409 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
410 {
411         ENTRY;
412
413         if (res->lr_lvb_data == NULL)
414                 RETURN(0);
415
416         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
417                 struct lquota_entry     *lqe = res->lr_lvb_data;
418
419                 /* release lqe reference */
420                 lqe_putref(lqe);
421         } else {
422                 struct dt_object        *obj = res->lr_lvb_data;
423                 struct lu_env           *env;
424                 int                      rc;
425
426                 OBD_ALLOC_PTR(env);
427                 if (env == NULL)
428                         RETURN(-ENOMEM);
429
430                 /* initialize environment */
431                 rc = lu_env_init(env, LCT_LOCAL);
432                 if (rc) {
433                         OBD_FREE_PTR(env);
434                         RETURN(rc);
435                 }
436
437                 /* release object reference */
438                 lu_object_put(env, &obj->do_lu);
439                 lu_env_fini(env);
440                 OBD_FREE_PTR(env);
441         }
442
443         res->lr_lvb_data = NULL;
444         res->lr_lvb_len  = 0;
445
446         RETURN(0);
447 }
448
449 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
450                                 struct obd_uuid *, union ldlm_gl_desc *,
451                                 void *);
452 /*
453  * Send glimpse callback to slaves holding a lock on resource \res.
454  * This is used to notify slaves of new quota settings or to claim quota space
455  * back.
456  *
457  * \param env  - is the environment passed by the caller
458  * \param qmt  - is the quota master target
459  * \param res  - is the dlm resource associated with the quota object
460  * \param desc - is the glimpse descriptor to pack in glimpse callback
461  * \param cb   - is the callback function called on every lock and determine
462  *               whether a glimpse should be issued
463  * \param arg  - is an opaq parameter passed to the callback function
464  */
465 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
466                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
467                             qmt_glimpse_cb_t cb, void *arg)
468 {
469         struct list_head *tmp, *pos;
470         struct list_head gl_list = LIST_HEAD_INIT(gl_list);
471         int              rc = 0;
472         ENTRY;
473
474         lock_res(res);
475         /* scan list of granted locks */
476         list_for_each(pos, &res->lr_granted) {
477                 struct ldlm_glimpse_work        *work;
478                 struct ldlm_lock                *lock;
479                 struct obd_uuid                 *uuid;
480
481                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
482                 LASSERT(lock->l_export);
483                 uuid = &lock->l_export->exp_client_uuid;
484
485                 if (cb != NULL) {
486                         rc = cb(env, qmt, uuid, desc, arg);
487                         if (rc == 0)
488                                 /* slave should not be notified */
489                                 continue;
490                         if (rc < 0)
491                                 /* something wrong happened, we still notify */
492                                 CERROR("%s: callback function failed to "
493                                        "determine whether slave %s should be "
494                                        "notified (%d)\n", qmt->qmt_svname,
495                                        obd_uuid2str(uuid), rc);
496                 }
497
498                 OBD_ALLOC_PTR(work);
499                 if (work == NULL) {
500                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
501                                obd_uuid2str(uuid));
502                         continue;
503                 }
504
505                 list_add_tail(&work->gl_list, &gl_list);
506                 work->gl_lock  = LDLM_LOCK_GET(lock);
507                 work->gl_flags = 0;
508                 work->gl_desc  = desc;
509
510         }
511         unlock_res(res);
512
513         if (list_empty(&gl_list)) {
514                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
515                 RETURN(0);
516         }
517
518         /* issue glimpse callbacks to all connected slaves */
519         rc = ldlm_glimpse_locks(res, &gl_list);
520
521         list_for_each_safe(pos, tmp, &gl_list) {
522                 struct ldlm_glimpse_work *work;
523
524                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
525
526                 list_del(&work->gl_list);
527                 CERROR("%s: failed to notify %s of new quota settings\n",
528                        qmt->qmt_svname,
529                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
530                 LDLM_LOCK_RELEASE(work->gl_lock);
531                 OBD_FREE_PTR(work);
532         }
533
534         RETURN(rc);
535 }
536
537 /*
538  * Send glimpse request to all global quota locks to push new quota setting to
539  * slaves.
540  *
541  * \param env - is the environment passed by the caller
542  * \param lqe - is the lquota entry which has new settings
543  * \param ver - is the version associated with the setting change
544  */
545 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
546                          __u64 ver)
547 {
548         struct qmt_thread_info  *qti = qmt_info(env);
549         struct qmt_pool_info    *pool = lqe2qpi(lqe);
550         struct ldlm_resource    *res = NULL;
551         ENTRY;
552
553         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
554                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
555
556         /* send glimpse callback to notify slaves of new quota settings */
557         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
558         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
559         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
560         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
561         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
562         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
563
564         /* look up ldlm resource associated with global index */
565         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
566         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
567                                 LDLM_PLAIN, 0);
568         if (IS_ERR(res)) {
569                 /* this might happen if no slaves have enqueued global quota
570                  * locks yet */
571                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
572                              "with "DFID, PFID(&qti->qti_fid));
573                 RETURN_EXIT;
574         }
575
576         qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
577                          NULL, NULL);
578         ldlm_resource_putref(res);
579         EXIT;
580 }
581
582 /* Callback function used to select locks that should be glimpsed when
583  * broadcasting the new qunit value */
584 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
585                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
586                           void *arg)
587 {
588         struct obd_uuid *slv_uuid = arg;
589
590         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
591                 RETURN(0);
592         RETURN(+1);
593 }
594
595 /*
596  * Send glimpse request on per-ID lock to push new qunit value to slave.
597  *
598  * \param env  - is the environment passed by the caller
599  * \param qmt  - is the quota master target device
600  * \param lqe  - is the lquota entry with the new qunit value
601  * \param uuid - is the uuid of the slave acquiring space, if any
602  */
603 static void qmt_id_lock_glimpse(const struct lu_env *env,
604                                 struct qmt_device *qmt,
605                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
606 {
607         struct qmt_thread_info  *qti = qmt_info(env);
608         struct qmt_pool_info    *pool = lqe2qpi(lqe);
609         struct ldlm_resource    *res = NULL;
610         ENTRY;
611
612         if (!lqe->lqe_enforced)
613                 RETURN_EXIT;
614
615         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
616                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
617         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
618         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
619                                 0);
620         if (IS_ERR(res)) {
621                 /* this might legitimately happens if slaves haven't had the
622                  * opportunity to enqueue quota lock yet. */
623                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
624                              "lock "DFID, PFID(&qti->qti_fid));
625                 lqe_write_lock(lqe);
626                 if (lqe->lqe_revoke_time == 0 &&
627                     lqe->lqe_qunit == pool->qpi_least_qunit)
628                         lqe->lqe_revoke_time = cfs_time_current_64();
629                 lqe_write_unlock(lqe);
630                 RETURN_EXIT;
631         }
632
633         lqe_write_lock(lqe);
634         /* The purpose of glimpse callback on per-ID lock is twofold:
635          * - notify slaves of new qunit value and hope they will release some
636          *   spare quota space in return
637          * - notify slaves that master ran out of quota space and there is no
638          *   need to send acquire request any more until further notice */
639
640         /* fill glimpse descriptor with lqe settings */
641         if (lqe->lqe_edquot)
642                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
643         else
644                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
645         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
646
647         if (lqe->lqe_revoke_time == 0 &&
648             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
649                 /* reset lqe_may_rel, it will be updated on glimpse callback
650                  * replies if needed */
651                 lqe->lqe_may_rel = 0;
652
653         /* The rebalance thread is the only thread which can issue glimpses */
654         LASSERT(!lqe->lqe_gl);
655         lqe->lqe_gl = true;
656         lqe_write_unlock(lqe);
657
658         /* issue glimpse callback to slaves */
659         qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
660                          uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
661
662         lqe_write_lock(lqe);
663         if (lqe->lqe_revoke_time == 0 &&
664             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
665             lqe->lqe_qunit == pool->qpi_least_qunit) {
666                 lqe->lqe_revoke_time = cfs_time_current_64();
667                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
668         }
669         LASSERT(lqe->lqe_gl);
670         lqe->lqe_gl = false;
671         lqe_write_unlock(lqe);
672
673         ldlm_resource_putref(res);
674         EXIT;
675 }
676
677 /*
678  * Schedule a glimpse request on per-ID locks to push new qunit value or
679  * edquot flag to quota slaves.
680  *
681  * \param qmt  - is the quota master target device
682  * \param lqe  - is the lquota entry with the new qunit value
683  */
684 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
685 {
686         bool    added = false;
687         ENTRY;
688
689         lqe_getref(lqe);
690         spin_lock(&qmt->qmt_reba_lock);
691         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
692                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
693                 added = true;
694         }
695         spin_unlock(&qmt->qmt_reba_lock);
696
697         if (added)
698                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
699         else
700                 lqe_putref(lqe);
701         EXIT;
702 }
703
704 /*
705  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
706  * quota locks owned by slaves in order to notify them of:
707  * - a qunit shrink in which case slaves might release quota space back in
708  *   glimpse reply.
709  * - set/clear edquot flag used to cache the "quota exhausted" state of the
710  *   master. When the flag is set, slaves know that there is no need to
711  *   try to acquire quota from the master since this latter has already
712  *   distributed all the space.
713  */
714 static int qmt_reba_thread(void *arg)
715 {
716         struct qmt_device       *qmt = (struct qmt_device *)arg;
717         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
718         struct l_wait_info       lwi = { 0 };
719         struct lu_env           *env;
720         struct lquota_entry     *lqe, *tmp;
721         int                      rc;
722         ENTRY;
723
724         OBD_ALLOC_PTR(env);
725         if (env == NULL)
726                 RETURN(-ENOMEM);
727
728         rc = lu_env_init(env, LCT_MD_THREAD);
729         if (rc) {
730                 CERROR("%s: failed to init env.", qmt->qmt_svname);
731                 OBD_FREE_PTR(env);
732                 RETURN(rc);
733         }
734
735         thread_set_flags(thread, SVC_RUNNING);
736         wake_up(&thread->t_ctl_waitq);
737
738         while (1) {
739                 l_wait_event(thread->t_ctl_waitq,
740                              !list_empty(&qmt->qmt_reba_list) ||
741                              !thread_is_running(thread), &lwi);
742
743                 spin_lock(&qmt->qmt_reba_lock);
744                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
745                                          lqe_link) {
746                         list_del_init(&lqe->lqe_link);
747                         spin_unlock(&qmt->qmt_reba_lock);
748
749                         if (thread_is_running(thread))
750                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
751
752                         lqe_putref(lqe);
753                         spin_lock(&qmt->qmt_reba_lock);
754                 }
755                 spin_unlock(&qmt->qmt_reba_lock);
756
757                 if (!thread_is_running(thread))
758                         break;
759         }
760         lu_env_fini(env);
761         OBD_FREE_PTR(env);
762         thread_set_flags(thread, SVC_STOPPED);
763         wake_up(&thread->t_ctl_waitq);
764         RETURN(rc);
765 }
766
767 /*
768  * Start rebalance thread. Called when the QMT is being setup
769  */
770 int qmt_start_reba_thread(struct qmt_device *qmt)
771 {
772         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
773         struct l_wait_info       lwi    = { 0 };
774         struct task_struct              *task;
775         ENTRY;
776
777         task = kthread_run(qmt_reba_thread, (void *)qmt,
778                                "qmt_reba_%s", qmt->qmt_svname);
779         if (IS_ERR(task)) {
780                 CERROR("%s: failed to start rebalance thread (%ld)\n",
781                        qmt->qmt_svname, PTR_ERR(task));
782                 thread_set_flags(thread, SVC_STOPPED);
783                 RETURN(PTR_ERR(task));
784         }
785
786         l_wait_event(thread->t_ctl_waitq,
787                      thread_is_running(thread) || thread_is_stopped(thread),
788                      &lwi);
789
790         RETURN(0);
791 }
792
793 /*
794  * Stop rebalance thread. Called when the QMT is about to shutdown.
795  */
796 void qmt_stop_reba_thread(struct qmt_device *qmt)
797 {
798         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
799
800         if (!thread_is_stopped(thread)) {
801                 struct l_wait_info lwi = { 0 };
802
803                 thread_set_flags(thread, SVC_STOPPING);
804                 wake_up(&thread->t_ctl_waitq);
805
806                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
807                              &lwi);
808         }
809         LASSERT(list_empty(&qmt->qmt_reba_list));
810 }