Whamcloud - gitweb
LU-6245 libcfs: remove prim wrappers for libcfs
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2014, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34 #include <lustre_dlm.h>
35 #include <obd_class.h>
36
37 #include "qmt_internal.h"
38
39 /* intent policy function called from mdt_intent_opc() when the intent is of
40  * quota type */
41 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
42                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
43                       int flags)
44 {
45         struct qmt_device       *qmt = lu2qmt_dev(ld);
46         struct ldlm_intent      *it;
47         struct quota_body       *reqbody;
48         struct quota_body       *repbody;
49         struct obd_uuid         *uuid;
50         struct lquota_lvb       *lvb;
51         struct ldlm_resource    *res = (*lockp)->l_resource;
52         int                      rc, lvb_len;
53         ENTRY;
54
55         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
56         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
57                              ldlm_lvbo_size(*lockp));
58
59         /* extract quota body and intent opc */
60         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
61         if (it == NULL)
62                 RETURN(err_serious(-EFAULT));
63
64         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
65         if (reqbody == NULL)
66                 RETURN(err_serious(-EFAULT));
67
68         /* prepare reply */
69         rc = req_capsule_server_pack(&req->rq_pill);
70         if (rc != 0) {
71                 CERROR("Can't pack response, rc %d\n", rc);
72                 RETURN(err_serious(rc));
73         }
74
75         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
76         if (repbody == NULL)
77                 RETURN(err_serious(-EFAULT));
78
79         uuid = &(*lockp)->l_export->exp_client_uuid;
80         switch (it->opc) {
81
82         case IT_QUOTA_DQACQ: {
83                 struct lquota_entry     *lqe;
84                 struct ldlm_lock        *lock;
85
86                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
87                         /* acquire on global lock? something is wrong ... */
88                         GOTO(out, rc = -EPROTO);
89
90                 /* verify global lock isn't stale */
91                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
92                         GOTO(out, rc = -ENOLCK);
93
94                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
95                 if (lock == NULL)
96                         GOTO(out, rc = -ENOLCK);
97                 LDLM_LOCK_PUT(lock);
98
99                 lqe = res->lr_lvb_data;
100                 LASSERT(lqe != NULL);
101                 lqe_getref(lqe);
102
103                 /* acquire quota space */
104                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
105                                 reqbody->qb_count, reqbody->qb_usage,
106                                 repbody);
107                 lqe_putref(lqe);
108                 if (rc)
109                         GOTO(out, rc);
110                 break;
111         }
112
113         case IT_QUOTA_CONN:
114                 /* new connection from slave */
115
116                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
117                         /* connection on per-ID lock? something is wrong ... */
118                         GOTO(out, rc = -EPROTO);
119
120                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
121                                        &repbody->qb_slv_fid,
122                                        &repbody->qb_slv_ver, uuid);
123                 if (rc)
124                         GOTO(out, rc);
125                 break;
126
127         default:
128                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
129                        it->opc);
130                 GOTO(out, rc = err_serious(-EINVAL));
131         }
132
133         /* on success, pack lvb in reply */
134         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
135         lvb_len = ldlm_lvbo_size(*lockp);
136         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
137         if (lvb_len < 0)
138                 GOTO(out, rc = lvb_len);
139
140         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
141         EXIT;
142 out:
143         return rc;
144 }
145
146 /*
147  * Initialize quota LVB associated with quota indexes.
148  * Called with res->lr_lvb_sem held
149  */
150 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
151 {
152         struct lu_env           *env;
153         struct qmt_thread_info  *qti;
154         struct qmt_device       *qmt = lu2qmt_dev(ld);
155         int                      pool_id, pool_type, qtype;
156         int                      rc;
157         ENTRY;
158
159         LASSERT(res != NULL);
160
161         if (res->lr_type != LDLM_PLAIN)
162                 RETURN(-ENOTSUPP);
163
164         if (res->lr_lvb_data ||
165             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
166                 RETURN(0);
167
168         OBD_ALLOC_PTR(env);
169         if (env == NULL)
170                 RETURN(-ENOMEM);
171
172         /* initialize environment */
173         rc = lu_env_init(env, LCT_MD_THREAD);
174         if (rc != 0)
175                 GOTO(out_free, rc);
176         qti = qmt_info(env);
177
178         /* extract global index FID and quota identifier */
179         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
180
181         /* sanity check the global index FID */
182         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
183         if (rc) {
184                 CERROR("can't extract pool information from FID "DFID"\n",
185                        PFID(&qti->qti_fid));
186                 GOTO(out, rc);
187         }
188
189         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
190                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
191                  * we are thus dealing with an ID lock. */
192                 struct lquota_entry     *lqe;
193
194                 /* Find the quota entry associated with the quota id */
195                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
196                                           &qti->qti_id);
197                 if (IS_ERR(lqe))
198                         GOTO(out, rc = PTR_ERR(lqe));
199
200                 /* store reference to lqe in lr_lvb_data */
201                 res->lr_lvb_data = lqe;
202                 LQUOTA_DEBUG(lqe, "initialized res lvb");
203         } else {
204                 struct dt_object        *obj;
205
206                 /* lookup global index */
207                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
208                 if (IS_ERR(obj))
209                         GOTO(out, rc = PTR_ERR(obj));
210                 if (!dt_object_exists(obj)) {
211                         lu_object_put(env, &obj->do_lu);
212                         GOTO(out, rc = -ENOENT);
213                 }
214
215                 /* store reference to global index object in lr_lvb_data */
216                 res->lr_lvb_data = obj;
217                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
218         }
219
220         res->lr_lvb_len = sizeof(struct lquota_lvb);
221         EXIT;
222 out:
223         lu_env_fini(env);
224 out_free:
225         OBD_FREE_PTR(env);
226         return rc;
227 }
228
229 /*
230  * Update LVB associated with the global quota index.
231  * This function is called from the DLM itself after a glimpse callback, in this
232  * case valid ptlrpc request is passed.
233  */
234 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
235                     struct ptlrpc_request *req, int increase_only)
236 {
237         struct lu_env           *env;
238         struct qmt_thread_info  *qti;
239         struct qmt_device       *qmt = lu2qmt_dev(ld);
240         struct lquota_entry     *lqe;
241         struct lquota_lvb       *lvb;
242         struct ldlm_lock        *lock;
243         struct obd_export       *exp;
244         int                      rc = 0;
245         ENTRY;
246
247         LASSERT(res != NULL);
248
249         if (req == NULL)
250                 RETURN(0);
251
252         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
253                 /* no need to update lvb for global quota locks */
254                 RETURN(0);
255
256         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
257                                           lustre_swab_lquota_lvb);
258         if (lvb == NULL) {
259                 CERROR("%s: failed to extract lvb from request\n",
260                        qmt->qmt_svname);
261                 RETURN(-EFAULT);
262         }
263
264         lqe = res->lr_lvb_data;
265         LASSERT(lqe != NULL);
266         lqe_getref(lqe);
267
268         LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
269                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
270
271         if (lvb->lvb_id_rel == 0) {
272                 /* nothing to release */
273                 if (lvb->lvb_id_may_rel != 0)
274                         /* but might still release later ... */
275                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
276                 GOTO(out_lqe, rc = 0);
277         }
278
279         /* allocate environement */
280         OBD_ALLOC_PTR(env);
281         if (env == NULL)
282                 GOTO(out_lqe, rc = -ENOMEM);
283
284         /* initialize environment */
285         rc = lu_env_init(env, LCT_MD_THREAD);
286         if (rc)
287                 GOTO(out_env, rc);
288         qti = qmt_info(env);
289
290         /* The request is a glimpse callback which was sent via the
291          * reverse import to the slave. What we care about here is the
292          * export associated with the slave and req->rq_export is
293          * definitely not what we are looking for (it is actually set to
294          * NULL here).
295          * Therefore we extract the lock from the request argument
296          * and use lock->l_export. */
297         lock = ldlm_request_lock(req);
298         if (IS_ERR(lock)) {
299                 CERROR("%s: failed to get lock from request!\n",
300                        qmt->qmt_svname);
301                 GOTO(out_env_init, rc = PTR_ERR(lock));
302         }
303
304         exp = class_export_get(lock->l_export);
305         if (exp == NULL) {
306                 CERROR("%s: failed to get export from lock!\n",
307                        qmt->qmt_svname);
308                 GOTO(out_env_init, rc = -EFAULT);
309         }
310
311         /* release quota space */
312         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
313                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
314         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
315                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
316                              LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
317                              lvb->lvb_id_rel, rc);
318         class_export_put(exp);
319         if (rc)
320                 GOTO(out_env_init, rc);
321         EXIT;
322 out_env_init:
323         lu_env_fini(env);
324 out_env:
325         OBD_FREE_PTR(env);
326 out_lqe:
327         lqe_putref(lqe);
328         return rc;
329 }
330
331 /*
332  * Report size of lvb to ldlm layer in order to allocate lvb buffer
333  * As far as quota locks are concerned, the size is static and is the same
334  * for both global and per-ID locks which shares the same lvb format.
335  */
336 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
337 {
338         return sizeof(struct lquota_lvb);
339 }
340
341 /*
342  * Fill request buffer with quota lvb
343  */
344 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
345                   int lvblen)
346 {
347         struct ldlm_resource    *res = lock->l_resource;
348         struct lquota_lvb       *qlvb = lvb;
349         ENTRY;
350
351         LASSERT(res != NULL);
352
353         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
354             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
355                 RETURN(-EINVAL);
356
357         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
358                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
359                  * we are thus dealing with an ID lock. */
360                 struct lquota_entry     *lqe = res->lr_lvb_data;
361
362                 /* return current qunit value & edquot flags in lvb */
363                 lqe_getref(lqe);
364                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
365                 qlvb->lvb_flags = 0;
366                 if (lqe->lqe_edquot)
367                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
368                 lqe_putref(lqe);
369         } else {
370                 /* global quota lock */
371                 struct lu_env           *env;
372                 int                      rc;
373                 struct dt_object        *obj = res->lr_lvb_data;
374
375                 OBD_ALLOC_PTR(env);
376                 if (env == NULL)
377                         RETURN(-ENOMEM);
378
379                 /* initialize environment */
380                 rc = lu_env_init(env, LCT_LOCAL);
381                 if (rc) {
382                         OBD_FREE_PTR(env);
383                         RETURN(rc);
384                 }
385
386                 /* return current version of global index */
387                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
388
389                 lu_env_fini(env);
390                 OBD_FREE_PTR(env);
391         }
392
393         RETURN(sizeof(struct lquota_lvb));
394 }
395
396 /*
397  * Free lvb associated with a given ldlm resource
398  * we don't really allocate a lvb, lr_lvb_data just points to
399  * the appropriate backend structures.
400  */
401 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
402 {
403         ENTRY;
404
405         if (res->lr_lvb_data == NULL)
406                 RETURN(0);
407
408         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
409                 struct lquota_entry     *lqe = res->lr_lvb_data;
410
411                 /* release lqe reference */
412                 lqe_putref(lqe);
413         } else {
414                 struct dt_object        *obj = res->lr_lvb_data;
415                 struct lu_env           *env;
416                 int                      rc;
417
418                 OBD_ALLOC_PTR(env);
419                 if (env == NULL)
420                         RETURN(-ENOMEM);
421
422                 /* initialize environment */
423                 rc = lu_env_init(env, LCT_LOCAL);
424                 if (rc) {
425                         OBD_FREE_PTR(env);
426                         RETURN(rc);
427                 }
428
429                 /* release object reference */
430                 lu_object_put(env, &obj->do_lu);
431                 lu_env_fini(env);
432                 OBD_FREE_PTR(env);
433         }
434
435         res->lr_lvb_data = NULL;
436         res->lr_lvb_len  = 0;
437
438         RETURN(0);
439 }
440
441 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
442                                 struct obd_uuid *, union ldlm_gl_desc *,
443                                 void *);
444 /*
445  * Send glimpse callback to slaves holding a lock on resource \res.
446  * This is used to notify slaves of new quota settings or to claim quota space
447  * back.
448  *
449  * \param env  - is the environment passed by the caller
450  * \param qmt  - is the quota master target
451  * \param res  - is the dlm resource associated with the quota object
452  * \param desc - is the glimpse descriptor to pack in glimpse callback
453  * \param cb   - is the callback function called on every lock and determine
454  *               whether a glimpse should be issued
455  * \param arg  - is an opaq parameter passed to the callback function
456  */
457 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
458                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
459                             qmt_glimpse_cb_t cb, void *arg)
460 {
461         struct list_head *tmp, *pos;
462         struct list_head gl_list = LIST_HEAD_INIT(gl_list);
463         int              rc = 0;
464         ENTRY;
465
466         lock_res(res);
467         /* scan list of granted locks */
468         list_for_each(pos, &res->lr_granted) {
469                 struct ldlm_glimpse_work        *work;
470                 struct ldlm_lock                *lock;
471                 struct obd_uuid                 *uuid;
472
473                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
474                 LASSERT(lock->l_export);
475                 uuid = &lock->l_export->exp_client_uuid;
476
477                 if (cb != NULL) {
478                         rc = cb(env, qmt, uuid, desc, arg);
479                         if (rc == 0)
480                                 /* slave should not be notified */
481                                 continue;
482                         if (rc < 0)
483                                 /* something wrong happened, we still notify */
484                                 CERROR("%s: callback function failed to "
485                                        "determine whether slave %s should be "
486                                        "notified (%d)\n", qmt->qmt_svname,
487                                        obd_uuid2str(uuid), rc);
488                 }
489
490                 OBD_ALLOC_PTR(work);
491                 if (work == NULL) {
492                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
493                                obd_uuid2str(uuid));
494                         continue;
495                 }
496
497                 list_add_tail(&work->gl_list, &gl_list);
498                 work->gl_lock  = LDLM_LOCK_GET(lock);
499                 work->gl_flags = 0;
500                 work->gl_desc  = desc;
501
502         }
503         unlock_res(res);
504
505         if (list_empty(&gl_list)) {
506                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
507                 RETURN(0);
508         }
509
510         /* issue glimpse callbacks to all connected slaves */
511         rc = ldlm_glimpse_locks(res, &gl_list);
512
513         list_for_each_safe(pos, tmp, &gl_list) {
514                 struct ldlm_glimpse_work *work;
515
516                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
517
518                 list_del(&work->gl_list);
519                 CERROR("%s: failed to notify %s of new quota settings\n",
520                        qmt->qmt_svname,
521                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
522                 LDLM_LOCK_RELEASE(work->gl_lock);
523                 OBD_FREE_PTR(work);
524         }
525
526         RETURN(rc);
527 }
528
529 /*
530  * Send glimpse request to all global quota locks to push new quota setting to
531  * slaves.
532  *
533  * \param env - is the environment passed by the caller
534  * \param lqe - is the lquota entry which has new settings
535  * \param ver - is the version associated with the setting change
536  */
537 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
538                          __u64 ver)
539 {
540         struct qmt_thread_info  *qti = qmt_info(env);
541         struct qmt_pool_info    *pool = lqe2qpi(lqe);
542         struct ldlm_resource    *res = NULL;
543         int                      rc;
544         ENTRY;
545
546         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
547                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
548
549         /* send glimpse callback to notify slaves of new quota settings */
550         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
551         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
552         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
553         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
554         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
555         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
556
557         /* look up ldlm resource associated with global index */
558         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
559         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
560                                 LDLM_PLAIN, 0);
561         if (IS_ERR(res)) {
562                 /* this might happen if no slaves have enqueued global quota
563                  * locks yet */
564                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
565                              "with "DFID, PFID(&qti->qti_fid));
566                 RETURN_EXIT;
567         }
568
569         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
570                               NULL, NULL);
571         ldlm_resource_putref(res);
572         EXIT;
573 }
574
575 /* Callback function used to select locks that should be glimpsed when
576  * broadcasting the new qunit value */
577 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
578                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
579                           void *arg)
580 {
581         struct obd_uuid *slv_uuid = arg;
582
583         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
584                 RETURN(0);
585         RETURN(+1);
586 }
587
588 /*
589  * Send glimpse request on per-ID lock to push new qunit value to slave.
590  *
591  * \param env  - is the environment passed by the caller
592  * \param qmt  - is the quota master target device
593  * \param lqe  - is the lquota entry with the new qunit value
594  * \param uuid - is the uuid of the slave acquiring space, if any
595  */
596 static void qmt_id_lock_glimpse(const struct lu_env *env,
597                                 struct qmt_device *qmt,
598                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
599 {
600         struct qmt_thread_info  *qti = qmt_info(env);
601         struct qmt_pool_info    *pool = lqe2qpi(lqe);
602         struct ldlm_resource    *res = NULL;
603         int                      rc;
604         ENTRY;
605
606         if (!lqe->lqe_enforced)
607                 RETURN_EXIT;
608
609         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
610                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
611         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
612         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
613                                 0);
614         if (IS_ERR(res)) {
615                 /* this might legitimately happens if slaves haven't had the
616                  * opportunity to enqueue quota lock yet. */
617                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
618                              "lock "DFID, PFID(&qti->qti_fid));
619                 lqe_write_lock(lqe);
620                 if (lqe->lqe_revoke_time == 0 &&
621                     lqe->lqe_qunit == pool->qpi_least_qunit)
622                         lqe->lqe_revoke_time = cfs_time_current_64();
623                 lqe_write_unlock(lqe);
624                 RETURN_EXIT;
625         }
626
627         lqe_write_lock(lqe);
628         /* The purpose of glimpse callback on per-ID lock is twofold:
629          * - notify slaves of new qunit value and hope they will release some
630          *   spare quota space in return
631          * - notify slaves that master ran out of quota space and there is no
632          *   need to send acquire request any more until further notice */
633
634         /* fill glimpse descriptor with lqe settings */
635         if (lqe->lqe_edquot)
636                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
637         else
638                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
639         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
640
641         if (lqe->lqe_revoke_time == 0 &&
642             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
643                 /* reset lqe_may_rel, it will be updated on glimpse callback
644                  * replies if needed */
645                 lqe->lqe_may_rel = 0;
646
647         /* The rebalance thread is the only thread which can issue glimpses */
648         LASSERT(!lqe->lqe_gl);
649         lqe->lqe_gl = true;
650         lqe_write_unlock(lqe);
651
652         /* issue glimpse callback to slaves */
653         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
654                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
655
656         lqe_write_lock(lqe);
657         if (lqe->lqe_revoke_time == 0 &&
658             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
659             lqe->lqe_qunit == pool->qpi_least_qunit) {
660                 lqe->lqe_revoke_time = cfs_time_current_64();
661                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
662         }
663         LASSERT(lqe->lqe_gl);
664         lqe->lqe_gl = false;
665         lqe_write_unlock(lqe);
666
667         ldlm_resource_putref(res);
668         EXIT;
669 }
670
671 /*
672  * Schedule a glimpse request on per-ID locks to push new qunit value or
673  * edquot flag to quota slaves.
674  *
675  * \param qmt  - is the quota master target device
676  * \param lqe  - is the lquota entry with the new qunit value
677  */
678 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
679 {
680         bool    added = false;
681         ENTRY;
682
683         lqe_getref(lqe);
684         spin_lock(&qmt->qmt_reba_lock);
685         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
686                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
687                 added = true;
688         }
689         spin_unlock(&qmt->qmt_reba_lock);
690
691         if (added)
692                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
693         else
694                 lqe_putref(lqe);
695         EXIT;
696 }
697
698 /*
699  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
700  * quota locks owned by slaves in order to notify them of:
701  * - a qunit shrink in which case slaves might release quota space back in
702  *   glimpse reply.
703  * - set/clear edquot flag used to cache the "quota exhausted" state of the
704  *   master. When the flag is set, slaves know that there is no need to
705  *   try to acquire quota from the master since this latter has already
706  *   distributed all the space.
707  */
708 static int qmt_reba_thread(void *arg)
709 {
710         struct qmt_device       *qmt = (struct qmt_device *)arg;
711         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
712         struct l_wait_info       lwi = { 0 };
713         struct lu_env           *env;
714         struct lquota_entry     *lqe, *tmp;
715         int                      rc;
716         ENTRY;
717
718         OBD_ALLOC_PTR(env);
719         if (env == NULL)
720                 RETURN(-ENOMEM);
721
722         rc = lu_env_init(env, LCT_MD_THREAD);
723         if (rc) {
724                 CERROR("%s: failed to init env.", qmt->qmt_svname);
725                 OBD_FREE_PTR(env);
726                 RETURN(rc);
727         }
728
729         thread_set_flags(thread, SVC_RUNNING);
730         wake_up(&thread->t_ctl_waitq);
731
732         while (1) {
733                 l_wait_event(thread->t_ctl_waitq,
734                              !list_empty(&qmt->qmt_reba_list) ||
735                              !thread_is_running(thread), &lwi);
736
737                 spin_lock(&qmt->qmt_reba_lock);
738                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
739                                          lqe_link) {
740                         list_del_init(&lqe->lqe_link);
741                         spin_unlock(&qmt->qmt_reba_lock);
742
743                         if (thread_is_running(thread))
744                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
745
746                         lqe_putref(lqe);
747                         spin_lock(&qmt->qmt_reba_lock);
748                 }
749                 spin_unlock(&qmt->qmt_reba_lock);
750
751                 if (!thread_is_running(thread))
752                         break;
753         }
754         lu_env_fini(env);
755         OBD_FREE_PTR(env);
756         thread_set_flags(thread, SVC_STOPPED);
757         wake_up(&thread->t_ctl_waitq);
758         RETURN(rc);
759 }
760
761 /*
762  * Start rebalance thread. Called when the QMT is being setup
763  */
764 int qmt_start_reba_thread(struct qmt_device *qmt)
765 {
766         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
767         struct l_wait_info       lwi    = { 0 };
768         struct task_struct              *task;
769         ENTRY;
770
771         task = kthread_run(qmt_reba_thread, (void *)qmt,
772                                "qmt_reba_%s", qmt->qmt_svname);
773         if (IS_ERR(task)) {
774                 CERROR("%s: failed to start rebalance thread (%ld)\n",
775                        qmt->qmt_svname, PTR_ERR(task));
776                 thread_set_flags(thread, SVC_STOPPED);
777                 RETURN(PTR_ERR(task));
778         }
779
780         l_wait_event(thread->t_ctl_waitq,
781                      thread_is_running(thread) || thread_is_stopped(thread),
782                      &lwi);
783
784         RETURN(0);
785 }
786
787 /*
788  * Stop rebalance thread. Called when the QMT is about to shutdown.
789  */
790 void qmt_stop_reba_thread(struct qmt_device *qmt)
791 {
792         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
793
794         if (!thread_is_stopped(thread)) {
795                 struct l_wait_info lwi = { 0 };
796
797                 thread_set_flags(thread, SVC_STOPPING);
798                 wake_up(&thread->t_ctl_waitq);
799
800                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
801                              &lwi);
802         }
803         LASSERT(list_empty(&qmt->qmt_reba_list));
804 }