Whamcloud - gitweb
LU-17245 utils: fix lfs error messages with multiple paths
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         struct ldlm_reply       *ldlm_rep;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
83         if (ldlm_rep == NULL)
84                 RETURN(err_serious(-EFAULT));
85
86         uuid = &(*lockp)->l_export->exp_client_uuid;
87         switch (it->opc) {
88
89         case IT_QUOTA_DQACQ: {
90                 struct lquota_entry     *lqe;
91                 struct ldlm_lock        *lock;
92                 int idx, stype;
93
94                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
95                         /* acquire on global lock? something is wrong ... */
96                         GOTO(out, rc = -EPROTO);
97
98                 /* verify global lock isn't stale */
99                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
100                         GOTO(out, rc = -ENOLCK);
101
102                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
103                 if (lock == NULL)
104                         GOTO(out, rc = -ENOLCK);
105                 LDLM_LOCK_PUT(lock);
106
107                 stype = qmt_uuid2idx(uuid, &idx);
108                 if (stype < 0)
109                         GOTO(out, rc = -EINVAL);
110
111                 /* TODO: it seems we don't need to get lqe from
112                  * lq_lvb_data anymore ... And do extra get
113                  * and put on it */
114                 lqe = res->lr_lvb_data;
115                 LASSERT(lqe != NULL);
116                 lqe_getref(lqe);
117
118                 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
119                                           lqe_qtype(lqe), &reqbody->qb_id,
120                                           NULL, idx);
121                 if (rc) {
122                         lqe_putref(lqe);
123                         GOTO(out, rc);
124                 }
125
126                 /* acquire quota space */
127                 rc = qmt_dqacq0(env, qmt, uuid,
128                                 reqbody->qb_flags, reqbody->qb_count,
129                                 reqbody->qb_usage, repbody,
130                                 qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
131                 lqe_putref(lqe);
132                 qti_lqes_fini(env);
133                 if (rc)
134                         GOTO(out, rc);
135                 break;
136         }
137
138         case IT_QUOTA_CONN:
139                 /* new connection from slave */
140
141                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
142                         /* connection on per-ID lock? something is wrong ... */
143                         GOTO(out, rc = -EPROTO);
144
145                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
146                                        &repbody->qb_slv_fid,
147                                        &repbody->qb_slv_ver, uuid);
148                 if (rc)
149                         GOTO(out, rc);
150                 break;
151
152         default:
153                 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
154                        it->opc);
155                 GOTO(out, rc = -EINVAL);
156         }
157
158         /* on success, pack lvb in reply */
159         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
160         lvb_len = ldlm_lvbo_size(*lockp);
161         lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
162         if (lvb_len < 0)
163                 GOTO(out, rc = lvb_len);
164
165         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
166 out:
167         ldlm_rep->lock_policy_res2 = clear_serious(rc);
168         EXIT;
169         return ELDLM_OK;
170 }
171
172 /*
173  * Initialize quota LVB associated with quota indexes.
174  * Called with res->lr_lvb_sem held
175  */
176 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
177 {
178         struct lu_env           *env;
179         struct qmt_thread_info  *qti;
180         struct qmt_device       *qmt = lu2qmt_dev(ld);
181         int                      pool_type, qtype;
182         int                      rc;
183         ENTRY;
184
185         LASSERT(res != NULL);
186
187         if (res->lr_type != LDLM_PLAIN)
188                 RETURN(-ENOTSUPP);
189
190         if (res->lr_lvb_data ||
191             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
192                 RETURN(0);
193
194         env = lu_env_find();
195         LASSERT(env);
196         qti = qmt_info(env);
197
198         /* extract global index FID and quota identifier */
199         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
200
201         /* sanity check the global index FID */
202         rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
203         if (rc) {
204                 CERROR("can't extract glb index information from FID "DFID"\n",
205                        PFID(&qti->qti_fid));
206                 GOTO(out, rc);
207         }
208
209         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
210                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
211                  * we are thus dealing with an ID lock. */
212                 struct qmt_pool_info    *pool;
213                 struct lquota_entry     *lqe;
214                 struct lqe_glbl_data    *lgd;
215
216                 pool = qmt_pool_lookup_glb(env, qmt, pool_type);
217                 if (IS_ERR(pool))
218                         GOTO(out, rc = -ENOMEM);
219
220                 /* Find the quota entry associated with the quota id */
221                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
222                                           &qti->qti_id, NULL);
223                 if (IS_ERR(lqe)) {
224                         qpi_putref(env, pool);
225                         GOTO(out, rc = PTR_ERR(lqe));
226                 }
227
228                 /* TODO: need something like qmt_extend_lqe_gd that has
229                  * to be calledeach time when qpi_slv_nr is incremented */
230                 lgd = qmt_alloc_lqe_gd(pool, qtype);
231                 if (!lgd) {
232                         lqe_putref(lqe);
233                         qpi_putref(env, pool);
234                         GOTO(out, rc = -ENOMEM);
235                 }
236
237                 qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
238
239                 /* store reference to lqe in lr_lvb_data */
240                 res->lr_lvb_data = lqe;
241                 qpi_putref(env, pool);
242                 LQUOTA_DEBUG(lqe, "initialized res lvb");
243         } else {
244                 struct dt_object        *obj;
245
246                 /* lookup global index */
247                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
248                 if (IS_ERR(obj))
249                         GOTO(out, rc = PTR_ERR(obj));
250                 if (!dt_object_exists(obj)) {
251                         dt_object_put(env, obj);
252                         GOTO(out, rc = -ENOENT);
253                 }
254
255                 /* store reference to global index object in lr_lvb_data */
256                 res->lr_lvb_data = obj;
257                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
258         }
259
260         res->lr_lvb_len = sizeof(struct lquota_lvb);
261         EXIT;
262 out:
263         return rc;
264 }
265
266 /* clear lge_qunit/edquot_nu flags -
267  * slave recieved new qunit and edquot.
268  *
269  * \retval      true if revoke is needed - qunit
270  *              for this slave reaches least_qunit
271  */
272 static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
273 {
274         unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
275         bool revoke = false;
276
277         /* There is no array to store lge for the case of DOM.
278          * Ignore it until MDT pools will be ready.
279          */
280         if (!qmt_dom(lqe_rtype(lqe), stype)) {
281                 struct lqe_glbl_data *lgd;
282
283                 mutex_lock(&lqe->lqe_glbl_data_lock);
284                 lgd = lqe->lqe_glbl_data;
285                 if (lgd) {
286                         int lge_idx = qmt_map_lge_idx(lgd, idx);
287
288                         lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
289                         lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
290                         /* We shouldn't call revoke for DOM case, it will be
291                          * updated at qmt_id_lock_glimpse.
292                          */
293                         revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
294                 }
295                 mutex_unlock(&lqe->lqe_glbl_data_lock);
296         }
297
298         return revoke;
299 }
300
301 static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
302                           int stype, int idx)
303 {
304         unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
305         bool notify = false;
306
307         if (qmt_dom(lqe_rtype(lqe_gl), stype))
308                 return false;
309
310         qti_lqes_write_lock(env);
311         mutex_lock(&lqe_gl->lqe_glbl_data_lock);
312         if (lqe_gl->lqe_glbl_data) {
313                 struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
314                 int lge_idx;
315
316                 lge_idx = qmt_map_lge_idx(lgd, idx);
317                 if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
318                         struct lquota_entry *lqe;
319                         int i;
320
321                         for (i = 0; i < qti_lqes_cnt(env); i++) {
322                                 lqe = qti_lqes(env)[i];
323                                 LQUOTA_DEBUG(lqe,
324                                              "lge_qunit %llu least_qunit %lu idx %d\n",
325                                              lgd->lqeg_arr[lge_idx].lge_qunit,
326                                              least_qunit, idx);
327                                 if (lqe->lqe_qunit == least_qunit) {
328                                         lqe->lqe_revoke_time =
329                                                         ktime_get_seconds();
330                                         notify |= qmt_adjust_edquot(lqe,
331                                                   ktime_get_real_seconds());
332                                 }
333                         }
334                 }
335         }
336         mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
337         qti_lqes_write_unlock(env);
338
339         return notify;
340 }
341
342 /*
343  * Update LVB associated with the global quota index.
344  * This function is called from the DLM itself after a glimpse callback, in this
345  * case valid ptlrpc request is passed.
346  */
347 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
348                     struct ptlrpc_request *req, int increase_only)
349 {
350         struct lu_env           *env;
351         struct qmt_thread_info  *qti;
352         struct qmt_device       *qmt = lu2qmt_dev(ld);
353         struct lquota_entry     *lqe;
354         struct lquota_lvb       *lvb;
355         struct ldlm_lock        *lock;
356         struct obd_export       *exp;
357         bool                     need_revoke;
358         int                      rc = 0, idx, stype;
359         ENTRY;
360
361         LASSERT(res != NULL);
362
363         if (req == NULL)
364                 RETURN(0);
365
366         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
367                 /* no need to update lvb for global quota locks */
368                 RETURN(0);
369
370         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
371                                           lustre_swab_lquota_lvb);
372         if (lvb == NULL) {
373                 CERROR("%s: failed to extract lvb from request\n",
374                        qmt->qmt_svname);
375                 RETURN(-EFAULT);
376         }
377
378         lqe = res->lr_lvb_data;
379         LASSERT(lqe != NULL);
380         lqe_getref(lqe);
381
382         /* allocate environement */
383         env = lu_env_find();
384         LASSERT(env);
385         qti = qmt_info(env);
386
387         /* The request is a glimpse callback which was sent via the
388          * reverse import to the slave. What we care about here is the
389          * export associated with the slave and req->rq_export is
390          * definitely not what we are looking for (it is actually set to
391          * NULL here).
392          * Therefore we extract the lock from the request argument
393          * and use lock->l_export. */
394         lock = ldlm_request_lock(req);
395         if (IS_ERR(lock)) {
396                 CERROR("%s: failed to get lock from request!\n",
397                        qmt->qmt_svname);
398                 GOTO(out, rc = PTR_ERR(lock));
399         }
400
401         exp = class_export_get(lock->l_export);
402         if (exp == NULL) {
403                 CERROR("%s: failed to get export from lock!\n",
404                        qmt->qmt_svname);
405                 GOTO(out, rc = -EFAULT);
406         }
407
408         stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
409         if (stype < 0)
410                 GOTO(out_exp, rc = stype);
411
412         need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
413         if (lvb->lvb_id_rel == 0) {
414                 /* nothing to release */
415                 if (lvb->lvb_id_may_rel != 0)
416                         /* but might still release later ... */
417                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
418         }
419
420         if (!need_revoke && lvb->lvb_id_rel == 0)
421                 GOTO(out_exp, rc = 0);
422
423         rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
424                                   lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
425         if (rc)
426                 GOTO(out_exp, rc);
427
428         if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
429                 int notify = false;
430
431                 mutex_lock(&lqe->lqe_glbl_data_lock);
432                 if (lqe->lqe_glbl_data) {
433                         qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
434                         notify = true;
435                 }
436                 mutex_unlock(&lqe->lqe_glbl_data_lock);
437                 if (notify)
438                         qmt_id_lock_notify(qmt, lqe);
439         }
440
441         if (lvb->lvb_id_rel) {
442                 LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
443                              lvb->lvb_id_rel, lvb->lvb_id_may_rel);
444
445                 /* release quota space */
446                 rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
447                                 QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
448                                 0, &qti->qti_body,
449                                 qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
450                 if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
451                         LQUOTA_ERROR(lqe,
452                                      "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
453                                      qti->qti_body.qb_count,
454                                      lvb->lvb_id_rel, rc);
455         }
456         qti_lqes_fini(env);
457         if (rc)
458                 GOTO(out_exp, rc);
459         EXIT;
460 out_exp:
461         class_export_put(exp);
462 out:
463         lqe_putref(lqe);
464         return rc;
465 }
466
467 /*
468  * Report size of lvb to ldlm layer in order to allocate lvb buffer
469  * As far as quota locks are concerned, the size is static and is the same
470  * for both global and per-ID locks which shares the same lvb format.
471  */
472 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
473 {
474         return sizeof(struct lquota_lvb);
475 }
476
477 /*
478  * Fill request buffer with quota lvb
479  */
480 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
481                   int lvblen)
482 {
483         struct ldlm_resource *res = lock->l_resource;
484         struct lquota_lvb *qlvb = lvb;
485         struct lu_env *env;
486         int rc;
487         ENTRY;
488
489         LASSERT(res != NULL);
490         rc = 0;
491
492         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
493             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
494                 RETURN(-EINVAL);
495
496         env = lu_env_find();
497         LASSERT(env);
498
499         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
500                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
501                  * we are thus dealing with an ID lock. */
502                 struct lquota_entry *lqe = res->lr_lvb_data;
503                 struct qmt_device *qmt;
504                 struct obd_uuid *uuid;
505                 int idx;
506
507                 uuid = &(lock)->l_export->exp_client_uuid;
508                 rc = qmt_uuid2idx(uuid, &idx);
509                 if (rc < 0)
510                         RETURN(rc);
511                 qmt = lu2qmt_dev(ld);
512                 /* return current qunit value & edquot flags in lvb */
513                 lqe_getref(lqe);
514                 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
515                                           lqe_qtype(lqe), &lqe->lqe_id,
516                                           NULL, idx);
517                 if (!rc) {
518                         qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
519                         qlvb->lvb_flags = 0;
520                         if (qti_lqes_edquot(env))
521                                 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
522                         qti_lqes_fini(env);
523                 }
524                 CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
525                        (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
526                        qlvb->lvb_flags, qlvb->lvb_id_qunit);
527                 lqe_putref(lqe);
528         } else {
529                 /* global quota lock */
530                 struct dt_object *obj = res->lr_lvb_data;
531
532                 /* return current version of global index */
533                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
534         }
535
536         RETURN(rc = rc ?: sizeof(struct lquota_lvb));
537 }
538
539 /*
540  * Free lvb associated with a given ldlm resource
541  * we don't really allocate a lvb, lr_lvb_data just points to
542  * the appropriate backend structures.
543  */
544 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
545 {
546         ENTRY;
547
548         if (res->lr_lvb_data == NULL)
549                 RETURN(0);
550
551         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
552                 struct lquota_entry *lqe = res->lr_lvb_data;
553                 struct lqe_glbl_data *lgd;
554
555                 mutex_lock(&lqe->lqe_glbl_data_lock);
556                 lgd = lqe->lqe_glbl_data;
557                 lqe->lqe_glbl_data = NULL;
558                 mutex_unlock(&lqe->lqe_glbl_data_lock);
559                 qmt_free_lqe_gd(lgd);
560
561                 /* release lqe reference */
562                 lqe_putref(lqe);
563         } else {
564                 struct dt_object *obj = res->lr_lvb_data;
565                 /* release object reference */
566                 dt_object_put(lu_env_find(), obj);
567         }
568
569         res->lr_lvb_data = NULL;
570         res->lr_lvb_len  = 0;
571
572         RETURN(0);
573 }
574
575 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
576
577 struct qmt_gl_lock_array {
578         unsigned long             q_max;
579         unsigned long             q_cnt;
580         struct ldlm_lock        **q_locks;
581 };
582
583 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
584 {
585         int i;
586
587         if (array->q_max == 0) {
588                 LASSERT(array->q_locks == NULL);
589                 return;
590         }
591
592         for (i = 0; i < array->q_cnt; i++) {
593                 LASSERT(array->q_locks[i]);
594                 LDLM_LOCK_RELEASE(array->q_locks[i]);
595                 array->q_locks[i] = NULL;
596         }
597         array->q_cnt = 0;
598         OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
599         array->q_locks = NULL;
600         array->q_max = 0;
601 }
602
603 static int qmt_alloc_lock_array(struct ldlm_resource *res,
604                                 struct qmt_gl_lock_array *array,
605                                 qmt_glimpse_cb_t cb, void *arg)
606 {
607         struct lquota_entry *lqe = arg;
608         struct list_head *pos;
609         unsigned long count = 0;
610         int fail_cnt = 0;
611         ENTRY;
612
613         LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
614 again:
615         if (cb)
616                 mutex_lock(&lqe->lqe_glbl_data_lock);
617         lock_res(res);
618         /* scan list of granted locks */
619         list_for_each(pos, &res->lr_granted) {
620                 struct ldlm_lock *lock;
621                 int rc;
622
623                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
624                 LASSERT(lock->l_export);
625
626                 if (cb != NULL) {
627                         rc = cb(lock, arg);
628                         /* slave should not be notified */
629                         if (rc == 0)
630                                 continue;
631                 }
632
633                 count++;
634                 if (array->q_max != 0 && array->q_cnt < array->q_max) {
635                         array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
636                         array->q_cnt++;
637                 }
638         }
639         unlock_res(res);
640         if (cb)
641                 mutex_unlock(&lqe->lqe_glbl_data_lock);
642
643         if (count > array->q_max) {
644                 qmt_free_lock_array(array);
645                 if (++fail_cnt > 5)
646                         RETURN(-EAGAIN);
647                 /*
648                  * allocate more slots in case of more qualified locks are
649                  * found during next loop
650                  */
651                 array->q_max = count + count / 2 + 10;
652                 count = 0;
653                 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
654                 OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
655                 if (array->q_locks == NULL) {
656                         array->q_max = 0;
657                         RETURN(-ENOMEM);
658                 }
659
660                 goto again;
661         }
662         RETURN(0);
663 }
664
665 static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
666                               struct lquota_entry *lqe)
667 {
668         struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
669         int idx, stype;
670         __u64 qunit;
671         bool edquot;
672
673         stype = qmt_uuid2idx(uuid, &idx);
674         LASSERT(stype >= 0);
675
676         /* DOM case - set global lqe settings */
677         if (qmt_dom(lqe_rtype(lqe), stype)) {
678                 edquot = lqe->lqe_edquot;
679                 qunit = lqe->lqe_qunit;
680         } else {
681                 struct lqe_glbl_data *lgd;
682                 int lge_idx;
683
684                 mutex_lock(&lqe->lqe_glbl_data_lock);
685                 lgd = lqe->lqe_glbl_data;
686                 if (lgd) {
687                         lge_idx = qmt_map_lge_idx(lgd, idx);
688                         edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
689                         qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
690                 } else {
691                         edquot = lqe->lqe_edquot;
692                         qunit = lqe->lqe_qunit;
693                 }
694                 mutex_unlock(&lqe->lqe_glbl_data_lock);
695         }
696
697         /* fill glimpse descriptor with lqe settings */
698         desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
699         desc->lquota_desc.gl_qunit = qunit;
700         CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
701                          stype, idx, desc->lquota_desc.gl_flags,
702                          desc->lquota_desc.gl_qunit);
703 }
704
705 /*
706  * Send glimpse callback to slaves holding a lock on resource \res.
707  * This is used to notify slaves of new quota settings or to claim quota space
708  * back.
709  *
710  * \param env  - is the environment passed by the caller
711  * \param qmt  - is the quota master target
712  * \param res  - is the dlm resource associated with the quota object
713  * \param desc - is the glimpse descriptor to pack in glimpse callback
714  * \param cb   - is the callback function called on every lock and determine
715  *               whether a glimpse should be issued
716  * \param arg  - is an opaq parameter passed to the callback function
717  */
718 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
719                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
720                             qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
721 {
722         union ldlm_gl_desc *descs = NULL;
723         struct list_head *tmp, *pos;
724         LIST_HEAD(gl_list);
725         struct qmt_gl_lock_array locks;
726         unsigned long i, locks_count;
727         int rc = 0;
728         ENTRY;
729
730         memset(&locks, 0, sizeof(locks));
731         rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
732         if (rc) {
733                 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
734                        qmt->qmt_svname, rc);
735                 RETURN(rc);
736         }
737         if (!locks.q_cnt) {
738                 CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
739                        qmt->qmt_svname);
740                 RETURN(0);
741         }
742         CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
743         locks_count = locks.q_cnt;
744
745         /* Use one desc for all works, when called from qmt_glb_lock_notify */
746         if (cb && locks.q_cnt > 1) {
747                 /* TODO: think about to store this preallocated descs
748                  * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
749                  * The benefit is that we don't need to allocate/free
750                  * and setup this descs each time. But the drawback is
751                  * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
752                  * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
753                 OBD_ALLOC(descs,
754                           sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
755                 if (!descs) {
756                         CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
757                                qmt->qmt_svname, rc);
758                         qmt_free_lock_array(&locks);
759                         RETURN(-ENOMEM);
760                 }
761         }
762
763         for (i = locks.q_cnt; i > 0; i--) {
764                 struct ldlm_glimpse_work *work;
765
766                 OBD_ALLOC_PTR(work);
767                 if (work == NULL) {
768                         CERROR("%s: failed to notify a lock.\n",
769                                qmt->qmt_svname);
770                         continue;
771                 }
772
773                 if (cb) {
774                         if (descs)
775                                 desc = &descs[i - 1];
776                         qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
777                         work->gl_interpret_data = lqe;
778                 }
779
780                 list_add_tail(&work->gl_list, &gl_list);
781                 work->gl_lock  = locks.q_locks[i - 1];
782                 work->gl_flags = 0;
783                 work->gl_desc  = desc;
784
785                 locks.q_locks[i - 1] = NULL;
786                 locks.q_cnt--;
787         }
788
789         qmt_free_lock_array(&locks);
790
791         if (list_empty(&gl_list)) {
792                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
793                 GOTO(out, rc = 0);
794         }
795
796         /* issue glimpse callbacks to all connected slaves */
797         rc = ldlm_glimpse_locks(res, &gl_list);
798
799         list_for_each_safe(pos, tmp, &gl_list) {
800                 struct ldlm_glimpse_work *work;
801
802                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
803
804                 list_del(&work->gl_list);
805                 CERROR("%s: failed to notify %s of new quota settings\n",
806                        qmt->qmt_svname,
807                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
808                 LDLM_LOCK_RELEASE(work->gl_lock);
809                 OBD_FREE_PTR(work);
810         }
811 out:
812         if (descs)
813                 OBD_FREE(descs,
814                          sizeof(struct ldlm_gl_lquota_desc) * locks_count);
815
816         RETURN(rc);
817 }
818
819 /*
820  * Send glimpse request to all global quota locks to push new quota setting to
821  * slaves.
822  *
823  * \param env - is the environment passed by the caller
824  * \param lqe - is the lquota entry which has new settings
825  * \param ver - is the version associated with the setting change
826  */
827 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
828                          __u64 ver)
829 {
830         struct qmt_thread_info  *qti = qmt_info(env);
831         struct qmt_pool_info    *pool = lqe2qpi(lqe);
832         struct ldlm_resource    *res = NULL;
833         ENTRY;
834
835         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
836
837         /* send glimpse callback to notify slaves of new quota settings */
838         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
839         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
840         if (lqe->lqe_is_default) {
841                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
842                 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
843                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
844                                                         LQUOTA_FLAG_DEFAULT);
845
846         } else if (lqe->lqe_is_deleted) {
847                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
848                 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
849                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
850                                                         LQUOTA_FLAG_DELETED);
851         } else if (lqe->lqe_is_reset) {
852                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
853                 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
854                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
855                                                         LQUOTA_FLAG_RESET);
856         } else if (lqe->lqe_granted > lqe->lqe_hardlimit) {
857                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
858                 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
859                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
860                                                         LQUOTA_FLAG_REVOKE);
861         } else {
862                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
863                 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
864                 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
865         }
866         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
867
868         /* look up ldlm resource associated with global index */
869         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
870         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
871                                 LDLM_PLAIN, 0);
872         if (IS_ERR(res)) {
873                 /* this might happen if no slaves have enqueued global quota
874                  * locks yet */
875                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
876                              "with "DFID, PFID(&qti->qti_fid));
877                 RETURN_EXIT;
878         }
879
880         qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
881                          NULL, NULL);
882         ldlm_resource_putref(res);
883         EXIT;
884 }
885
886 /* Callback function used to select locks that should be glimpsed when
887  * broadcasting the new qunit value */
888 static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
889 {
890         struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
891         struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
892         int idx;
893         int stype = qmt_uuid2idx(uuid, &idx);
894
895         LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
896
897         CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
898                stype, lqe_rtype(lqe), idx, uuid->uuid);
899         /* Quota pools support only OSTs, despite MDTs also could be registered
900          * as LQUOTA_RES_DT devices(DOM). */
901         if (qmt_dom(lqe_rtype(lqe), stype))
902                 return 1;
903
904         if (lgd) {
905                 int lge_idx = qmt_map_lge_idx(lgd, idx);
906
907                 CDEBUG(D_QUOTA,
908                        "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
909                        idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
910                        lgd->lqeg_arr[lge_idx].lge_qunit_nu);
911                 return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
912                        lgd->lqeg_arr[lge_idx].lge_qunit_nu;
913         }
914
915         return 0;
916 }
917
918
919 /*
920  * Send glimpse request on per-ID lock to push new qunit value to slave.
921  *
922  * \param env  - is the environment passed by the caller
923  * \param qmt  - is the quota master target device
924  * \param lqe  - is the lquota entry with the new qunit value
925  * \param uuid - is the uuid of the slave acquiring space, if any
926  */
927 static void qmt_id_lock_glimpse(const struct lu_env *env,
928                                 struct qmt_device *qmt,
929                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
930 {
931         struct qmt_thread_info  *qti = qmt_info(env);
932         struct qmt_pool_info    *pool = lqe2qpi(lqe);
933         struct ldlm_resource    *res = NULL;
934         ENTRY;
935
936         if (!lqe->lqe_enforced)
937                 RETURN_EXIT;
938
939         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
940         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
941         res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
942         if (IS_ERR(res)) {
943                 /* this might legitimately happens if slaves haven't had the
944                  * opportunity to enqueue quota lock yet. */
945                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
946                              "lock "DFID, PFID(&qti->qti_fid));
947                 lqe_write_lock(lqe);
948                 if (lqe->lqe_revoke_time == 0 &&
949                     lqe->lqe_qunit == pool->qpi_least_qunit)
950                         lqe->lqe_revoke_time = ktime_get_seconds();
951                 lqe_write_unlock(lqe);
952                 RETURN_EXIT;
953         }
954
955         lqe_write_lock(lqe);
956         /*
957          * It is possible to add an lqe in a 2nd time while the same lqe
958          * from the 1st time is still sending glimpse
959          */
960         if (lqe->lqe_gl)
961                 GOTO(out, 0);
962         /* The purpose of glimpse callback on per-ID lock is twofold:
963          * - notify slaves of new qunit value and hope they will release some
964          *   spare quota space in return
965          * - notify slaves that master ran out of quota space and there is no
966          *   need to send acquire request any more until further notice */
967
968         /* TODO: it is not clear how to implement below case for all lqes
969          * from where slaves will be notified in qmt_glimpse_lock. Because
970          * here we have just global lqe with an array of OSTs that should
971          * be notified. Theoretically we can find all lqes that includes
972          * these OSTs, but it is not trivial. So I would propose to move
973          * this case to another place ... */
974         if (lqe->lqe_revoke_time == 0 &&
975             lqe->lqe_qunit == pool->qpi_least_qunit)
976                 /* reset lqe_may_rel, it will be updated on glimpse callback
977                  * replies if needed */
978                 lqe->lqe_may_rel = 0;
979
980         lqe->lqe_gl = true;
981         lqe_write_unlock(lqe);
982
983         /* issue glimpse callback to slaves */
984         if (lqe->lqe_glbl_data)
985                 qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
986                                  qmt_id_lock_cb, lqe);
987
988         lqe_write_lock(lqe);
989         if (lqe->lqe_revoke_time == 0 &&
990             lqe->lqe_qunit == pool->qpi_least_qunit) {
991                 lqe->lqe_revoke_time = ktime_get_seconds();
992                 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
993         }
994         LASSERT(lqe->lqe_gl);
995         lqe->lqe_gl = false;
996 out:
997         lqe_write_unlock(lqe);
998         ldlm_resource_putref(res);
999         EXIT;
1000 }
1001
1002 /*
1003  * Schedule a glimpse request on per-ID locks to push new qunit value or
1004  * edquot flag to quota slaves.
1005  *
1006  * \param qmt  - is the quota master target device
1007  * \param lqe  - is the lquota entry with the new qunit value
1008  */
1009 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
1010 {
1011         bool    added = false;
1012         ENTRY;
1013
1014         LASSERT(lqe->lqe_is_global);
1015         lqe_getref(lqe);
1016         spin_lock(&qmt->qmt_reba_lock);
1017         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
1018                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
1019                 added = true;
1020                 if (qmt->qmt_reba_task)
1021                         wake_up_process(qmt->qmt_reba_task);
1022         }
1023         spin_unlock(&qmt->qmt_reba_lock);
1024
1025         if (!added)
1026                 lqe_putref(lqe);
1027         EXIT;
1028 }
1029
1030 struct qmt_reba_args {
1031         struct qmt_device       *qra_dev;
1032         struct lu_env            qra_env;
1033         struct completion       *qra_started;
1034 };
1035
1036 #ifndef TASK_IDLE
1037 #define TASK_IDLE TASK_INTERRUPTIBLE
1038 #endif
1039
1040 /*
1041  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
1042  * quota locks owned by slaves in order to notify them of:
1043  * - a qunit shrink in which case slaves might release quota space back in
1044  *   glimpse reply.
1045  * - set/clear edquot flag used to cache the "quota exhausted" state of the
1046  *   master. When the flag is set, slaves know that there is no need to
1047  *   try to acquire quota from the master since this latter has already
1048  *   distributed all the space.
1049  */
1050 static int qmt_reba_thread(void *_args)
1051 {
1052         struct qmt_reba_args    *args = _args;
1053         struct qmt_device       *qmt = args->qra_dev;
1054         struct lu_env           *env = &args->qra_env;
1055         struct lquota_entry     *lqe, *tmp;
1056         ENTRY;
1057
1058         complete(args->qra_started);
1059         while (({set_current_state(TASK_IDLE);
1060                  !kthread_should_stop(); })) {
1061
1062                 spin_lock(&qmt->qmt_reba_lock);
1063                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
1064                                          lqe_link) {
1065                         __set_current_state(TASK_RUNNING);
1066                         list_del_init(&lqe->lqe_link);
1067                         spin_unlock(&qmt->qmt_reba_lock);
1068
1069                         /* lqe_ref == 1 means we hold the last ref,
1070                          * so no need to send glimpse callbacks.
1071                          */
1072                         if (!kthread_should_stop() &&
1073                             atomic_read(&lqe->lqe_ref) > 1)
1074                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
1075
1076                         lqe_putref(lqe);
1077                         spin_lock(&qmt->qmt_reba_lock);
1078                 }
1079                 spin_unlock(&qmt->qmt_reba_lock);
1080                 schedule();
1081         }
1082         __set_current_state(TASK_RUNNING);
1083
1084         lu_env_remove(env);
1085         lu_env_fini(env);
1086         OBD_FREE_PTR(args);
1087         RETURN(0);
1088 }
1089
1090 /*
1091  * Start rebalance thread. Called when the QMT is being setup
1092  */
1093 int qmt_start_reba_thread(struct qmt_device *qmt)
1094 {
1095         struct task_struct *task;
1096         struct qmt_reba_args *args;
1097         DECLARE_COMPLETION_ONSTACK(started);
1098         int rc;
1099         ENTRY;
1100
1101         OBD_ALLOC_PTR(args);
1102         if (args == NULL)
1103                 RETURN(-ENOMEM);
1104         args->qra_dev = qmt;
1105         args->qra_started = &started;
1106
1107         rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
1108         if (rc) {
1109                 CERROR("%s: failed to init env.\n", qmt->qmt_svname);
1110                 GOTO(out_env, rc);
1111         }
1112
1113         task = kthread_create(qmt_reba_thread, args,
1114                               "qmt_reba_%s", qmt->qmt_svname);
1115         if (IS_ERR(task)) {
1116                 CERROR("%s: failed to start rebalance thread (%ld)\n",
1117                        qmt->qmt_svname, PTR_ERR(task));
1118                 GOTO(out_env_fini, rc = PTR_ERR(task));
1119         }
1120
1121         rc = lu_env_add_task(&args->qra_env, task);
1122         if (rc) {
1123                 kthread_stop(task);
1124                 GOTO(out_env_fini, rc);
1125         }
1126         qmt->qmt_reba_task = task;
1127         wake_up_process(task);
1128         wait_for_completion(&started);
1129
1130         RETURN(0);
1131 out_env_fini:
1132         lu_env_fini(&args->qra_env);
1133 out_env:
1134         OBD_FREE_PTR(args);
1135         RETURN(rc);
1136 }
1137
1138 /*
1139  * Stop rebalance thread. Called when the QMT is about to shutdown.
1140  */
1141 void qmt_stop_reba_thread(struct qmt_device *qmt)
1142 {
1143         struct task_struct *task;
1144
1145         spin_lock(&qmt->qmt_reba_lock);
1146         task = qmt->qmt_reba_task;
1147         qmt->qmt_reba_task = NULL;
1148         spin_unlock(&qmt->qmt_reba_lock);
1149
1150         if (task)
1151                 kthread_stop(task);
1152
1153         LASSERT(list_empty(&qmt->qmt_reba_list));
1154 }