Whamcloud - gitweb
LU-3157 llite: A not locked mutex can be unlocked.
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qmt_internal.h"
41
42 /* intent policy function called from mdt_intent_opc() when the intent is of
43  * quota type */
44 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
45                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
46                       int flags)
47 {
48         struct qmt_device       *qmt = lu2qmt_dev(ld);
49         struct ldlm_intent      *it;
50         struct quota_body       *reqbody;
51         struct quota_body       *repbody;
52         struct obd_uuid         *uuid;
53         struct lquota_lvb       *lvb;
54         struct ldlm_resource    *res = (*lockp)->l_resource;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         uuid = &(*lockp)->l_export->exp_client_uuid;
83         switch (it->opc) {
84
85         case IT_QUOTA_DQACQ: {
86                 struct lquota_entry     *lqe;
87                 struct ldlm_lock        *lock;
88
89                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
90                         /* acquire on global lock? something is wrong ... */
91                         GOTO(out, rc = -EPROTO);
92
93                 /* verify global lock isn't stale */
94                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
95                         GOTO(out, rc = -ENOLCK);
96
97                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
98                 if (lock == NULL)
99                         GOTO(out, rc = -ENOLCK);
100                 LDLM_LOCK_PUT(lock);
101
102                 lqe = res->lr_lvb_data;
103                 LASSERT(lqe != NULL);
104                 lqe_getref(lqe);
105
106                 /* acquire quota space */
107                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
108                                 reqbody->qb_count, reqbody->qb_usage,
109                                 repbody);
110                 lqe_putref(lqe);
111                 if (rc)
112                         GOTO(out, rc);
113                 break;
114         }
115
116         case IT_QUOTA_CONN:
117                 /* new connection from slave */
118
119                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
120                         /* connection on per-ID lock? something is wrong ... */
121                         GOTO(out, rc = -EPROTO);
122
123                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
124                                        &repbody->qb_slv_fid,
125                                        &repbody->qb_slv_ver, uuid);
126                 if (rc)
127                         GOTO(out, rc);
128                 break;
129
130         default:
131                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
132                        it->opc);
133                 GOTO(out, rc = err_serious(-EINVAL));
134         }
135
136         /* on success, pack lvb in reply */
137         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
138         lvb_len = ldlm_lvbo_size(*lockp);
139         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
140         if (lvb_len < 0)
141                 GOTO(out, rc = lvb_len);
142
143         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
144         EXIT;
145 out:
146         return rc;
147 }
148
149 /*
150  * Initialize quota LVB associated with quota indexes.
151  * Called with res->lr_lvb_sem held
152  */
153 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
154 {
155         struct lu_env           *env;
156         struct qmt_thread_info  *qti;
157         struct qmt_device       *qmt = lu2qmt_dev(ld);
158         int                      pool_id, pool_type, qtype;
159         int                      rc;
160         ENTRY;
161
162         LASSERT(res != NULL);
163
164         if (res->lr_type != LDLM_PLAIN)
165                 RETURN(-ENOTSUPP);
166
167         if (res->lr_lvb_data ||
168             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
169                 RETURN(0);
170
171         OBD_ALLOC_PTR(env);
172         if (env == NULL)
173                 RETURN(-ENOMEM);
174
175         /* initialize environment */
176         rc = lu_env_init(env, LCT_MD_THREAD);
177         if (rc) {
178                 OBD_FREE_PTR(env);
179                 RETURN(rc);
180         }
181         qti = qmt_info(env);
182
183         /* extract global index FID and quota identifier */
184         fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
185
186         /* sanity check the global index FID */
187         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
188         if (rc) {
189                 CERROR("can't extract pool information from FID "DFID"\n",
190                        PFID(&qti->qti_fid));
191                 GOTO(out, rc);
192         }
193
194         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
195                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
196                  * we are thus dealing with an ID lock. */
197                 struct lquota_entry     *lqe;
198
199                 /* Find the quota entry associated with the quota id */
200                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
201                                           &qti->qti_id);
202                 if (IS_ERR(lqe))
203                         GOTO(out, rc = PTR_ERR(lqe));
204
205                 /* store reference to lqe in lr_lvb_data */
206                 res->lr_lvb_data = lqe;
207                 LQUOTA_DEBUG(lqe, "initialized res lvb");
208         } else {
209                 struct dt_object        *obj;
210
211                 /* lookup global index */
212                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
213                 if (IS_ERR(obj))
214                         GOTO(out, rc = PTR_ERR(obj));
215                 if (!dt_object_exists(obj)) {
216                         lu_object_put(env, &obj->do_lu);
217                         GOTO(out, rc = -ENOENT);
218                 }
219
220                 /* store reference to global index object in lr_lvb_data */
221                 res->lr_lvb_data = obj;
222                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
223         }
224
225         res->lr_lvb_len  = sizeof(struct lquota_lvb);
226         EXIT;
227 out:
228         lu_env_fini(env);
229         OBD_FREE_PTR(env);
230         return rc;
231 }
232
233 /*
234  * Update LVB associated with the global quota index.
235  * This function is called from the DLM itself after a glimpse callback, in this
236  * case valid ptlrpc request is passed.
237  */
238 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
239                     struct ptlrpc_request *req, int increase_only)
240 {
241         struct lu_env           *env;
242         struct qmt_thread_info  *qti;
243         struct qmt_device       *qmt = lu2qmt_dev(ld);
244         struct lquota_entry     *lqe;
245         struct lquota_lvb       *lvb;
246         struct ldlm_lock        *lock;
247         struct obd_export       *exp;
248         int                      rc = 0;
249         ENTRY;
250
251         LASSERT(res != NULL);
252
253         if (req == NULL)
254                 RETURN(0);
255
256         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
257                 /* no need to update lvb for global quota locks */
258                 RETURN(0);
259
260         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
261                                           lustre_swab_lquota_lvb);
262         if (lvb == NULL) {
263                 CERROR("%s: failed to extract lvb from request\n",
264                        qmt->qmt_svname);
265                 RETURN(-EFAULT);
266         }
267
268         lqe = res->lr_lvb_data;
269         LASSERT(lqe != NULL);
270         lqe_getref(lqe);
271
272         LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
273                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
274
275         if (lvb->lvb_id_rel == 0) {
276                 /* nothing to release */
277                 if (lvb->lvb_id_may_rel != 0)
278                         /* but might still release later ... */
279                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
280                 GOTO(out_lqe, rc = 0);
281         }
282
283         /* allocate environement */
284         OBD_ALLOC_PTR(env);
285         if (env == NULL)
286                 GOTO(out_lqe, rc = -ENOMEM);
287
288         /* initialize environment */
289         rc = lu_env_init(env, LCT_MD_THREAD);
290         if (rc)
291                 GOTO(out_env, rc);
292         qti = qmt_info(env);
293
294         /* The request is a glimpse callback which was sent via the
295          * reverse import to the slave. What we care about here is the
296          * export associated with the slave and req->rq_export is
297          * definitely not what we are looking for (it is actually set to
298          * NULL here).
299          * Therefore we extract the lock from the request argument
300          * and use lock->l_export. */
301         lock = ldlm_request_lock(req);
302         if (IS_ERR(lock)) {
303                 CERROR("%s: failed to get lock from request!\n",
304                        qmt->qmt_svname);
305                 GOTO(out_env_init, rc = PTR_ERR(lock));
306         }
307
308         exp = class_export_get(lock->l_export);
309         if (exp == NULL) {
310                 CERROR("%s: failed to get export from lock!\n",
311                        qmt->qmt_svname);
312                 GOTO(out_env_init, rc = -EFAULT);
313         }
314
315         /* release quota space */
316         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
317                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
318         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
319                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
320                              LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
321                              lvb->lvb_id_rel, rc);
322         class_export_put(exp);
323         if (rc)
324                 GOTO(out_env_init, rc);
325         EXIT;
326 out_env_init:
327         lu_env_fini(env);
328 out_env:
329         OBD_FREE_PTR(env);
330 out_lqe:
331         lqe_putref(lqe);
332         return rc;
333 }
334
335 /*
336  * Report size of lvb to ldlm layer in order to allocate lvb buffer
337  * As far as quota locks are concerned, the size is static and is the same
338  * for both global and per-ID locks which shares the same lvb format.
339  */
340 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
341 {
342         return sizeof(struct lquota_lvb);
343 }
344
345 /*
346  * Fill request buffer with quota lvb
347  */
348 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
349                   int lvblen)
350 {
351         struct ldlm_resource    *res = lock->l_resource;
352         struct lquota_lvb       *qlvb = lvb;
353         ENTRY;
354
355         LASSERT(res != NULL);
356
357         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
358             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
359                 RETURN(-EINVAL);
360
361         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
362                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
363                  * we are thus dealing with an ID lock. */
364                 struct lquota_entry     *lqe = res->lr_lvb_data;
365
366                 /* return current qunit value & edquot flags in lvb */
367                 lqe_getref(lqe);
368                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
369                 qlvb->lvb_flags = 0;
370                 if (lqe->lqe_edquot)
371                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
372                 lqe_putref(lqe);
373         } else {
374                 /* global quota lock */
375                 struct lu_env           *env;
376                 int                      rc;
377                 struct dt_object        *obj = res->lr_lvb_data;
378
379                 OBD_ALLOC_PTR(env);
380                 if (env == NULL)
381                         RETURN(-ENOMEM);
382
383                 /* initialize environment */
384                 rc = lu_env_init(env, LCT_LOCAL);
385                 if (rc) {
386                         OBD_FREE_PTR(env);
387                         RETURN(rc);
388                 }
389
390                 /* return current version of global index */
391                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
392
393                 lu_env_fini(env);
394                 OBD_FREE_PTR(env);
395         }
396
397         RETURN(sizeof(struct lquota_lvb));
398 }
399
400 /*
401  * Free lvb associated with a given ldlm resource
402  * we don't really allocate a lvb, lr_lvb_data just points to
403  * the appropriate backend structures.
404  */
405 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
406 {
407         ENTRY;
408
409         if (res->lr_lvb_data == NULL)
410                 RETURN(0);
411
412         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
413                 struct lquota_entry     *lqe = res->lr_lvb_data;
414
415                 /* release lqe reference */
416                 lqe_putref(lqe);
417         } else {
418                 struct dt_object        *obj = res->lr_lvb_data;
419                 struct lu_env           *env;
420                 int                      rc;
421
422                 OBD_ALLOC_PTR(env);
423                 if (env == NULL)
424                         RETURN(-ENOMEM);
425
426                 /* initialize environment */
427                 rc = lu_env_init(env, LCT_LOCAL);
428                 if (rc) {
429                         OBD_FREE_PTR(env);
430                         RETURN(rc);
431                 }
432
433                 /* release object reference */
434                 lu_object_put(env, &obj->do_lu);
435                 lu_env_fini(env);
436                 OBD_FREE_PTR(env);
437         }
438
439         res->lr_lvb_data = NULL;
440         res->lr_lvb_len  = 0;
441
442         RETURN(0);
443 }
444
445 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
446                                 struct obd_uuid *, union ldlm_gl_desc *,
447                                 void *);
448 /*
449  * Send glimpse callback to slaves holding a lock on resource \res.
450  * This is used to notify slaves of new quota settings or to claim quota space
451  * back.
452  *
453  * \param env  - is the environment passed by the caller
454  * \param qmt  - is the quota master target
455  * \param res  - is the dlm resource associated with the quota object
456  * \param desc - is the glimpse descriptor to pack in glimpse callback
457  * \param cb   - is the callback function called on every lock and determine
458  *               whether a glimpse should be issued
459  * \param arg  - is an opaq parameter passed to the callback function
460  */
461 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
462                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
463                             qmt_glimpse_cb_t cb, void *arg)
464 {
465         cfs_list_t      *tmp, *pos;
466         CFS_LIST_HEAD(gl_list);
467         int              rc = 0;
468         ENTRY;
469
470         lock_res(res);
471         /* scan list of granted locks */
472         cfs_list_for_each(pos, &res->lr_granted) {
473                 struct ldlm_glimpse_work        *work;
474                 struct ldlm_lock                *lock;
475                 struct obd_uuid                 *uuid;
476
477                 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
478                 LASSERT(lock->l_export);
479                 uuid = &lock->l_export->exp_client_uuid;
480
481                 if (cb != NULL) {
482                         rc = cb(env, qmt, uuid, desc, arg);
483                         if (rc == 0)
484                                 /* slave should not be notified */
485                                 continue;
486                         if (rc < 0)
487                                 /* something wrong happened, we still notify */
488                                 CERROR("%s: callback function failed to "
489                                        "determine whether slave %s should be "
490                                        "notified (%d)\n", qmt->qmt_svname,
491                                        obd_uuid2str(uuid), rc);
492                 }
493
494                 OBD_ALLOC_PTR(work);
495                 if (work == NULL) {
496                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
497                                obd_uuid2str(uuid));
498                         continue;
499                 }
500
501                 cfs_list_add_tail(&work->gl_list, &gl_list);
502                 work->gl_lock  = LDLM_LOCK_GET(lock);
503                 work->gl_flags = 0;
504                 work->gl_desc  = desc;
505
506         }
507         unlock_res(res);
508
509         if (cfs_list_empty(&gl_list)) {
510                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
511                 RETURN(0);
512         }
513
514         /* issue glimpse callbacks to all connected slaves */
515         rc = ldlm_glimpse_locks(res, &gl_list);
516
517         cfs_list_for_each_safe(pos, tmp, &gl_list) {
518                 struct ldlm_glimpse_work *work;
519
520                 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
521
522                 cfs_list_del(&work->gl_list);
523                 CERROR("%s: failed to notify %s of new quota settings\n",
524                        qmt->qmt_svname,
525                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
526                 LDLM_LOCK_RELEASE(work->gl_lock);
527                 OBD_FREE_PTR(work);
528         }
529
530         RETURN(rc);
531 }
532
533 /*
534  * Send glimpse request to all global quota locks to push new quota setting to
535  * slaves.
536  *
537  * \param env - is the environment passed by the caller
538  * \param lqe - is the lquota entry which has new settings
539  * \param ver - is the version associated with the setting change
540  */
541 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
542                          __u64 ver)
543 {
544         struct qmt_thread_info  *qti = qmt_info(env);
545         struct qmt_pool_info    *pool = lqe2qpi(lqe);
546         struct ldlm_resource    *res = NULL;
547         int                      rc;
548         ENTRY;
549
550         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
551                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
552
553         /* send glimpse callback to notify slaves of new quota settings */
554         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
555         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
556         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
557         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
558         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
559         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
560
561         /* look up ldlm resource associated with global index */
562         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
563         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
564                                 LDLM_PLAIN, 0);
565         if (res == NULL) {
566                 /* this might happen if no slaves have enqueued global quota
567                  * locks yet */
568                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
569                              "with "DFID, PFID(&qti->qti_fid));
570                 RETURN_EXIT;
571         }
572
573         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
574                               NULL, NULL);
575         ldlm_resource_putref(res);
576         EXIT;
577 }
578
579 /* Callback function used to select locks that should be glimpsed when
580  * broadcasting the new qunit value */
581 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
582                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
583                           void *arg)
584 {
585         struct obd_uuid *slv_uuid = arg;
586
587         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
588                 RETURN(0);
589         RETURN(+1);
590 }
591
592 /*
593  * Send glimpse request on per-ID lock to push new qunit value to slave.
594  *
595  * \param env  - is the environment passed by the caller
596  * \param qmt  - is the quota master target device
597  * \param lqe  - is the lquota entry with the new qunit value
598  * \param uuid - is the uuid of the slave acquiring space, if any
599  */
600 static void qmt_id_lock_glimpse(const struct lu_env *env,
601                                 struct qmt_device *qmt,
602                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
603 {
604         struct qmt_thread_info  *qti = qmt_info(env);
605         struct qmt_pool_info    *pool = lqe2qpi(lqe);
606         struct ldlm_resource    *res = NULL;
607         int                      rc;
608         ENTRY;
609
610         if (!lqe->lqe_enforced)
611                 RETURN_EXIT;
612
613         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
614                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
615         fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
616         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
617                                 0);
618         if (res == NULL) {
619                 /* this might legitimately happens if slaves haven't had the
620                  * opportunity to enqueue quota lock yet. */
621                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
622                              "lock "DFID, PFID(&qti->qti_fid));
623                 lqe_write_lock(lqe);
624                 if (lqe->lqe_revoke_time == 0 &&
625                     lqe->lqe_qunit == pool->qpi_least_qunit)
626                         lqe->lqe_revoke_time = cfs_time_current_64();
627                 lqe_write_unlock(lqe);
628                 RETURN_EXIT;
629         }
630
631         lqe_write_lock(lqe);
632         /* The purpose of glimpse callback on per-ID lock is twofold:
633          * - notify slaves of new qunit value and hope they will release some
634          *   spare quota space in return
635          * - notify slaves that master ran out of quota space and there is no
636          *   need to send acquire request any more until further notice */
637
638         /* fill glimpse descriptor with lqe settings */
639         if (lqe->lqe_edquot)
640                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
641         else
642                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
643         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
644
645         if (lqe->lqe_revoke_time == 0 &&
646             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
647                 /* reset lqe_may_rel, it will be updated on glimpse callback
648                  * replies if needed */
649                 lqe->lqe_may_rel = 0;
650
651         /* The rebalance thread is the only thread which can issue glimpses */
652         LASSERT(!lqe->lqe_gl);
653         lqe->lqe_gl = true;
654         lqe_write_unlock(lqe);
655
656         /* issue glimpse callback to slaves */
657         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
658                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
659
660         lqe_write_lock(lqe);
661         if (lqe->lqe_revoke_time == 0 &&
662             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
663             lqe->lqe_qunit == pool->qpi_least_qunit) {
664                 lqe->lqe_revoke_time = cfs_time_current_64();
665                 qmt_adjust_edquot(lqe, cfs_time_current_sec());
666         }
667         LASSERT(lqe->lqe_gl);
668         lqe->lqe_gl = false;
669         lqe_write_unlock(lqe);
670
671         ldlm_resource_putref(res);
672         EXIT;
673 }
674
675 /*
676  * Schedule a glimpse request on per-ID locks to push new qunit value or
677  * edquot flag to quota slaves.
678  *
679  * \param qmt  - is the quota master target device
680  * \param lqe  - is the lquota entry with the new qunit value
681  */
682 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
683 {
684         bool    added = false;
685         ENTRY;
686
687         lqe_getref(lqe);
688         spin_lock(&qmt->qmt_reba_lock);
689         if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
690                 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
691                 added = true;
692         }
693         spin_unlock(&qmt->qmt_reba_lock);
694
695         if (added)
696                 cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
697         else
698                 lqe_putref(lqe);
699         EXIT;
700 }
701
702 /*
703  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
704  * quota locks owned by slaves in order to notify them of:
705  * - a qunit shrink in which case slaves might release quota space back in
706  *   glimpse reply.
707  * - set/clear edquot flag used to cache the "quota exhausted" state of the
708  *   master. When the flag is set, slaves know that there is no need to
709  *   try to acquire quota from the master since this latter has already
710  *   distributed all the space.
711  */
712 static int qmt_reba_thread(void *arg)
713 {
714         struct qmt_device       *qmt = (struct qmt_device *)arg;
715         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
716         struct l_wait_info       lwi = { 0 };
717         struct lu_env           *env;
718         struct lquota_entry     *lqe, *tmp;
719         char                     pname[MTI_NAME_MAXLEN];
720         int                      rc;
721         ENTRY;
722
723         OBD_ALLOC_PTR(env);
724         if (env == NULL)
725                 RETURN(-ENOMEM);
726
727         rc = lu_env_init(env, LCT_MD_THREAD);
728         if (rc) {
729                 CERROR("%s: failed to init env.", qmt->qmt_svname);
730                 OBD_FREE_PTR(env);
731                 RETURN(rc);
732         }
733
734         snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
735         cfs_daemonize(pname);
736
737         thread_set_flags(thread, SVC_RUNNING);
738         cfs_waitq_signal(&thread->t_ctl_waitq);
739
740         while (1) {
741                 l_wait_event(thread->t_ctl_waitq,
742                              !cfs_list_empty(&qmt->qmt_reba_list) ||
743                              !thread_is_running(thread), &lwi);
744
745                 spin_lock(&qmt->qmt_reba_lock);
746                 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
747                                              lqe_link) {
748                         cfs_list_del_init(&lqe->lqe_link);
749                         spin_unlock(&qmt->qmt_reba_lock);
750
751                         if (thread_is_running(thread))
752                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
753
754                         lqe_putref(lqe);
755                         spin_lock(&qmt->qmt_reba_lock);
756                 }
757                 spin_unlock(&qmt->qmt_reba_lock);
758
759                 if (!thread_is_running(thread))
760                         break;
761         }
762         lu_env_fini(env);
763         OBD_FREE_PTR(env);
764         thread_set_flags(thread, SVC_STOPPED);
765         cfs_waitq_signal(&thread->t_ctl_waitq);
766         RETURN(rc);
767 }
768
769 /*
770  * Start rebalance thread. Called when the QMT is being setup
771  */
772 int qmt_start_reba_thread(struct qmt_device *qmt)
773 {
774         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
775         struct l_wait_info       lwi    = { 0 };
776         int                      rc;
777         ENTRY;
778
779         rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
780         if (rc < 0) {
781                 CERROR("%s: failed to start rebalance thread (%d)\n",
782                        qmt->qmt_svname, rc);
783                 thread_set_flags(thread, SVC_STOPPED);
784                 RETURN(rc);
785         }
786
787         l_wait_event(thread->t_ctl_waitq,
788                      thread_is_running(thread) || thread_is_stopped(thread),
789                      &lwi);
790
791         RETURN(0);
792 }
793
794 /*
795  * Stop rebalance thread. Called when the QMT is about to shutdown.
796  */
797 void qmt_stop_reba_thread(struct qmt_device *qmt)
798 {
799         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
800
801         if (!thread_is_stopped(thread)) {
802                 struct l_wait_info lwi = { 0 };
803
804                 thread_set_flags(thread, SVC_STOPPING);
805                 cfs_waitq_signal(&thread->t_ctl_waitq);
806
807                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
808                              &lwi);
809         }
810         LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
811 }