Whamcloud - gitweb
LU-16271 ptlrpc: fix eviction right after recovery
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         struct ldlm_reply       *ldlm_rep;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
83         if (ldlm_rep == NULL)
84                 RETURN(err_serious(-EFAULT));
85
86         uuid = &(*lockp)->l_export->exp_client_uuid;
87         switch (it->opc) {
88
89         case IT_QUOTA_DQACQ: {
90                 struct lquota_entry     *lqe;
91                 struct ldlm_lock        *lock;
92                 int idx;
93
94                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
95                         /* acquire on global lock? something is wrong ... */
96                         GOTO(out, rc = -EPROTO);
97
98                 /* verify global lock isn't stale */
99                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
100                         GOTO(out, rc = -ENOLCK);
101
102                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
103                 if (lock == NULL)
104                         GOTO(out, rc = -ENOLCK);
105                 LDLM_LOCK_PUT(lock);
106
107                 rc = qmt_uuid2idx(uuid, &idx);
108                 if (rc < 0)
109                         GOTO(out, rc = -EINVAL);
110
111                 /* TODO: it seems we don't need to get lqe from
112                  * lq_lvb_data anymore ... And do extra get
113                  * and put on it */
114                 lqe = res->lr_lvb_data;
115                 LASSERT(lqe != NULL);
116                 lqe_getref(lqe);
117
118                 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
119                                           lqe_qtype(lqe), &reqbody->qb_id,
120                                           NULL, idx);
121                 if (rc) {
122                         lqe_putref(lqe);
123                         GOTO(out, rc);
124                 }
125
126                 /* acquire quota space */
127                 rc = qmt_dqacq0(env, qmt, uuid,
128                                 reqbody->qb_flags, reqbody->qb_count,
129                                 reqbody->qb_usage, repbody);
130                 lqe_putref(lqe);
131                 qti_lqes_fini(env);
132                 if (rc)
133                         GOTO(out, rc);
134                 break;
135         }
136
137         case IT_QUOTA_CONN:
138                 /* new connection from slave */
139
140                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
141                         /* connection on per-ID lock? something is wrong ... */
142                         GOTO(out, rc = -EPROTO);
143
144                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
145                                        &repbody->qb_slv_fid,
146                                        &repbody->qb_slv_ver, uuid);
147                 if (rc)
148                         GOTO(out, rc);
149                 break;
150
151         default:
152                 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
153                        it->opc);
154                 GOTO(out, rc = -EINVAL);
155         }
156
157         /* on success, pack lvb in reply */
158         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
159         lvb_len = ldlm_lvbo_size(*lockp);
160         lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
161         if (lvb_len < 0)
162                 GOTO(out, rc = lvb_len);
163
164         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
165 out:
166         ldlm_rep->lock_policy_res2 = clear_serious(rc);
167         EXIT;
168         return ELDLM_OK;
169 }
170
171 /*
172  * Initialize quota LVB associated with quota indexes.
173  * Called with res->lr_lvb_sem held
174  */
175 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
176 {
177         struct lu_env           *env;
178         struct qmt_thread_info  *qti;
179         struct qmt_device       *qmt = lu2qmt_dev(ld);
180         int                      pool_type, qtype;
181         int                      rc;
182         ENTRY;
183
184         LASSERT(res != NULL);
185
186         if (res->lr_type != LDLM_PLAIN)
187                 RETURN(-ENOTSUPP);
188
189         if (res->lr_lvb_data ||
190             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
191                 RETURN(0);
192
193         env = lu_env_find();
194         LASSERT(env);
195         qti = qmt_info(env);
196
197         /* extract global index FID and quota identifier */
198         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
199
200         /* sanity check the global index FID */
201         rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
202         if (rc) {
203                 CERROR("can't extract glb index information from FID "DFID"\n",
204                        PFID(&qti->qti_fid));
205                 GOTO(out, rc);
206         }
207
208         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
209                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
210                  * we are thus dealing with an ID lock. */
211                 struct qmt_pool_info    *pool;
212                 struct lquota_entry     *lqe;
213                 struct lqe_glbl_data    *lgd;
214
215                 pool = qmt_pool_lookup_glb(env, qmt, pool_type);
216                 if (IS_ERR(pool))
217                         GOTO(out, rc = -ENOMEM);
218
219                 /* Find the quota entry associated with the quota id */
220                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
221                                           &qti->qti_id, NULL);
222                 if (IS_ERR(lqe)) {
223                         qpi_putref(env, pool);
224                         GOTO(out, rc = PTR_ERR(lqe));
225                 }
226
227                 /* TODO: need something like qmt_extend_lqe_gd that has
228                  * to be calledeach time when qpi_slv_nr is incremented */
229                 lgd = qmt_alloc_lqe_gd(pool, qtype);
230                 if (!lgd) {
231                         lqe_putref(lqe);
232                         qpi_putref(env, pool);
233                         GOTO(out, rc = -ENOMEM);
234                 }
235
236                 qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
237
238                 /* store reference to lqe in lr_lvb_data */
239                 res->lr_lvb_data = lqe;
240                 qpi_putref(env, pool);
241                 LQUOTA_DEBUG(lqe, "initialized res lvb");
242         } else {
243                 struct dt_object        *obj;
244
245                 /* lookup global index */
246                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
247                 if (IS_ERR(obj))
248                         GOTO(out, rc = PTR_ERR(obj));
249                 if (!dt_object_exists(obj)) {
250                         dt_object_put(env, obj);
251                         GOTO(out, rc = -ENOENT);
252                 }
253
254                 /* store reference to global index object in lr_lvb_data */
255                 res->lr_lvb_data = obj;
256                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
257         }
258
259         res->lr_lvb_len = sizeof(struct lquota_lvb);
260         EXIT;
261 out:
262         return rc;
263 }
264
265 /* clear lge_qunit/edquot_nu flags -
266  * slave recieved new qunit and edquot.
267  *
268  * \retval      true if revoke is needed - qunit
269  *              for this slave reaches least_qunit
270  */
271 static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
272 {
273         unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
274         struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
275
276         /* There is no array to store lge for the case of DOM.
277          * Ignore it until MDT pools will be ready. */
278         if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
279                 lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
280                 lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
281
282                 /* We shouldn't call revoke for DOM case, it will be updated
283                  * at qmt_id_lock_glimpse. */
284                 return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
285         }
286
287         return false;
288 }
289
290 static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
291                           int stype, int idx)
292 {
293         unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
294         struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
295         bool notify = false;
296
297         if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
298                 int i;
299
300                 qti_lqes_write_lock(env);
301                 for (i = 0; i < qti_lqes_cnt(env); i++) {
302                         LQUOTA_DEBUG(qti_lqes(env)[i],
303                                      "idx %d lge_qunit %llu least_qunit %lu\n",
304                                      idx, lgd->lqeg_arr[idx].lge_qunit,
305                                      least_qunit);
306                         if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
307                                 qti_lqes(env)[i]->lqe_revoke_time =
308                                                         ktime_get_seconds();
309                                 notify |= qmt_adjust_edquot(qti_lqes(env)[i],
310                                                   ktime_get_real_seconds());
311                         }
312                 }
313                 qti_lqes_write_unlock(env);
314         }
315         return notify;
316 }
317
318 /*
319  * Update LVB associated with the global quota index.
320  * This function is called from the DLM itself after a glimpse callback, in this
321  * case valid ptlrpc request is passed.
322  */
323 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
324                     struct ptlrpc_request *req, int increase_only)
325 {
326         struct lu_env           *env;
327         struct qmt_thread_info  *qti;
328         struct qmt_device       *qmt = lu2qmt_dev(ld);
329         struct lquota_entry     *lqe;
330         struct lquota_lvb       *lvb;
331         struct ldlm_lock        *lock;
332         struct obd_export       *exp;
333         bool                     need_revoke;
334         int                      rc = 0, idx;
335         ENTRY;
336
337         LASSERT(res != NULL);
338
339         if (req == NULL)
340                 RETURN(0);
341
342         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
343                 /* no need to update lvb for global quota locks */
344                 RETURN(0);
345
346         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
347                                           lustre_swab_lquota_lvb);
348         if (lvb == NULL) {
349                 CERROR("%s: failed to extract lvb from request\n",
350                        qmt->qmt_svname);
351                 RETURN(-EFAULT);
352         }
353
354         lqe = res->lr_lvb_data;
355         LASSERT(lqe != NULL);
356         lqe_getref(lqe);
357
358         /* allocate environement */
359         env = lu_env_find();
360         LASSERT(env);
361         qti = qmt_info(env);
362
363         /* The request is a glimpse callback which was sent via the
364          * reverse import to the slave. What we care about here is the
365          * export associated with the slave and req->rq_export is
366          * definitely not what we are looking for (it is actually set to
367          * NULL here).
368          * Therefore we extract the lock from the request argument
369          * and use lock->l_export. */
370         lock = ldlm_request_lock(req);
371         if (IS_ERR(lock)) {
372                 CERROR("%s: failed to get lock from request!\n",
373                        qmt->qmt_svname);
374                 GOTO(out, rc = PTR_ERR(lock));
375         }
376
377         exp = class_export_get(lock->l_export);
378         if (exp == NULL) {
379                 CERROR("%s: failed to get export from lock!\n",
380                        qmt->qmt_svname);
381                 GOTO(out, rc = -EFAULT);
382         }
383
384         rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
385         if (rc < 0)
386                 GOTO(out_exp, rc);
387
388         need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
389         if (lvb->lvb_id_rel == 0) {
390                 /* nothing to release */
391                 if (lvb->lvb_id_may_rel != 0)
392                         /* but might still release later ... */
393                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
394         }
395
396         if (!need_revoke && lvb->lvb_id_rel == 0)
397                 GOTO(out_exp, rc = 0);
398
399         rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
400                                   &lqe->lqe_id, NULL, idx);
401         if (rc)
402                 GOTO(out_exp, rc);
403
404         if (need_revoke && qmt_set_revoke(env, lqe, rc, idx) &&
405             lqe->lqe_glbl_data) {
406                 qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
407                 qmt_id_lock_notify(qmt, lqe);
408         }
409
410         if (lvb->lvb_id_rel) {
411                 LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
412                              lvb->lvb_id_rel, lvb->lvb_id_may_rel);
413
414                 /* release quota space */
415                 rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
416                                 QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
417                                 0, &qti->qti_body);
418                 if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
419                         LQUOTA_ERROR(lqe,
420                                      "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
421                                      qti->qti_body.qb_count,
422                                      lvb->lvb_id_rel, rc);
423         }
424         qti_lqes_fini(env);
425         if (rc)
426                 GOTO(out_exp, rc);
427         EXIT;
428 out_exp:
429         class_export_put(exp);
430 out:
431         lqe_putref(lqe);
432         return rc;
433 }
434
435 /*
436  * Report size of lvb to ldlm layer in order to allocate lvb buffer
437  * As far as quota locks are concerned, the size is static and is the same
438  * for both global and per-ID locks which shares the same lvb format.
439  */
440 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
441 {
442         return sizeof(struct lquota_lvb);
443 }
444
445 /*
446  * Fill request buffer with quota lvb
447  */
448 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
449                   int lvblen)
450 {
451         struct ldlm_resource *res = lock->l_resource;
452         struct lquota_lvb *qlvb = lvb;
453         struct lu_env *env;
454         int rc;
455         ENTRY;
456
457         LASSERT(res != NULL);
458         rc = 0;
459
460         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
461             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
462                 RETURN(-EINVAL);
463
464         env = lu_env_find();
465         LASSERT(env);
466
467         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
468                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
469                  * we are thus dealing with an ID lock. */
470                 struct lquota_entry *lqe = res->lr_lvb_data;
471                 struct qmt_device *qmt;
472                 struct obd_uuid *uuid;
473                 int idx;
474
475                 uuid = &(lock)->l_export->exp_client_uuid;
476                 rc = qmt_uuid2idx(uuid, &idx);
477                 if (rc < 0)
478                         RETURN(rc);
479                 qmt = lu2qmt_dev(ld);
480                 /* return current qunit value & edquot flags in lvb */
481                 lqe_getref(lqe);
482                 rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
483                                           lqe_qtype(lqe), &lqe->lqe_id,
484                                           NULL, idx);
485                 if (!rc) {
486                         qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
487                         qlvb->lvb_flags = 0;
488                         if (qti_lqes_edquot(env))
489                                 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
490                         qti_lqes_fini(env);
491                 }
492                 CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
493                        (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
494                        qlvb->lvb_flags, qlvb->lvb_id_qunit);
495                 lqe_putref(lqe);
496         } else {
497                 /* global quota lock */
498                 struct dt_object *obj = res->lr_lvb_data;
499
500                 /* return current version of global index */
501                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
502         }
503
504         RETURN(rc = rc ?: sizeof(struct lquota_lvb));
505 }
506
507 /*
508  * Free lvb associated with a given ldlm resource
509  * we don't really allocate a lvb, lr_lvb_data just points to
510  * the appropriate backend structures.
511  */
512 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
513 {
514         ENTRY;
515
516         if (res->lr_lvb_data == NULL)
517                 RETURN(0);
518
519         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
520                 struct lquota_entry *lqe = res->lr_lvb_data;
521                 struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
522
523                 mutex_lock(&lqe->lqe_glbl_data_lock);
524                 lqe->lqe_glbl_data = NULL;
525                 qmt_free_lqe_gd(lgd);
526                 mutex_unlock(&lqe->lqe_glbl_data_lock);
527
528                 /* release lqe reference */
529                 lqe_putref(lqe);
530         } else {
531                 struct dt_object *obj = res->lr_lvb_data;
532                 /* release object reference */
533                 dt_object_put(lu_env_find(), obj);
534         }
535
536         res->lr_lvb_data = NULL;
537         res->lr_lvb_len  = 0;
538
539         RETURN(0);
540 }
541
542 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
543
544 struct qmt_gl_lock_array {
545         unsigned long             q_max;
546         unsigned long             q_cnt;
547         struct ldlm_lock        **q_locks;
548 };
549
550 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
551 {
552         int i;
553
554         if (array->q_max == 0) {
555                 LASSERT(array->q_locks == NULL);
556                 return;
557         }
558
559         for (i = 0; i < array->q_cnt; i++) {
560                 LASSERT(array->q_locks[i]);
561                 LDLM_LOCK_RELEASE(array->q_locks[i]);
562                 array->q_locks[i] = NULL;
563         }
564         array->q_cnt = 0;
565         OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
566         array->q_locks = NULL;
567         array->q_max = 0;
568 }
569
570 static int qmt_alloc_lock_array(struct ldlm_resource *res,
571                                 struct qmt_gl_lock_array *array,
572                                 qmt_glimpse_cb_t cb, void *arg)
573 {
574         struct list_head *pos;
575         unsigned long count = 0;
576         int fail_cnt = 0;
577         ENTRY;
578
579         LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
580 again:
581         lock_res(res);
582         /* scan list of granted locks */
583         list_for_each(pos, &res->lr_granted) {
584                 struct ldlm_lock *lock;
585                 int rc;
586
587                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
588                 LASSERT(lock->l_export);
589
590                 if (cb != NULL) {
591                         rc = cb(lock, arg);
592                         /* slave should not be notified */
593                         if (rc == 0)
594                                 continue;
595                 }
596
597                 count++;
598                 if (array->q_max != 0 && array->q_cnt < array->q_max) {
599                         array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
600                         array->q_cnt++;
601                 }
602         }
603         unlock_res(res);
604
605         if (count > array->q_max) {
606                 qmt_free_lock_array(array);
607                 if (++fail_cnt > 5)
608                         RETURN(-EAGAIN);
609                 /*
610                  * allocate more slots in case of more qualified locks are
611                  * found during next loop
612                  */
613                 array->q_max = count + count / 2 + 10;
614                 count = 0;
615                 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
616                 OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
617                 if (array->q_locks == NULL) {
618                         array->q_max = 0;
619                         RETURN(-ENOMEM);
620                 }
621
622                 goto again;
623         }
624         RETURN(0);
625 }
626
627 void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
628                        struct lquota_entry *lqe)
629 {
630         struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
631         struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
632         int idx, stype;
633         __u64 qunit;
634         bool edquot;
635
636         stype = qmt_uuid2idx(uuid, &idx);
637         LASSERT(stype >= 0);
638
639         /* DOM case - set global lqe settings */
640         if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
641                 edquot = lqe->lqe_edquot;
642                 qunit = lqe->lqe_qunit;
643         } else {
644                 edquot = lgd->lqeg_arr[idx].lge_edquot;
645                 qunit = lgd->lqeg_arr[idx].lge_qunit;
646         }
647
648         /* fill glimpse descriptor with lqe settings */
649         desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
650         desc->lquota_desc.gl_qunit = qunit;
651         CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
652                          stype, idx, desc->lquota_desc.gl_flags,
653                          desc->lquota_desc.gl_qunit);
654 }
655
656 /*
657  * Send glimpse callback to slaves holding a lock on resource \res.
658  * This is used to notify slaves of new quota settings or to claim quota space
659  * back.
660  *
661  * \param env  - is the environment passed by the caller
662  * \param qmt  - is the quota master target
663  * \param res  - is the dlm resource associated with the quota object
664  * \param desc - is the glimpse descriptor to pack in glimpse callback
665  * \param cb   - is the callback function called on every lock and determine
666  *               whether a glimpse should be issued
667  * \param arg  - is an opaq parameter passed to the callback function
668  */
669 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
670                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
671                             qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
672 {
673         union ldlm_gl_desc *descs = NULL;
674         struct lqe_glbl_data *gld;
675         struct list_head *tmp, *pos;
676         LIST_HEAD(gl_list);
677         struct qmt_gl_lock_array locks;
678         unsigned long i, locks_count;
679         int rc = 0;
680         ENTRY;
681
682         gld = lqe ? lqe->lqe_glbl_data : NULL;
683         memset(&locks, 0, sizeof(locks));
684         rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
685         if (rc) {
686                 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
687                        qmt->qmt_svname, rc);
688                 RETURN(rc);
689         }
690         if (!locks.q_cnt) {
691                 CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
692                        qmt->qmt_svname);
693                 RETURN(0);
694         }
695         CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
696         locks_count = locks.q_cnt;
697
698         /* Use one desc for all works, when called from qmt_glb_lock_notify */
699         if (gld && locks.q_cnt > 1) {
700                 /* TODO: think about to store this preallocated descs
701                  * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
702                  * The benefit is that we don't need to allocate/free
703                  * and setup this descs each time. But the drawback is
704                  * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
705                  * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
706                 OBD_ALLOC(descs,
707                           sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
708                 if (!descs) {
709                         CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
710                                qmt->qmt_svname, rc);
711                         qmt_free_lock_array(&locks);
712                         RETURN(-ENOMEM);
713                 }
714         }
715
716         for (i = locks.q_cnt; i > 0; i--) {
717                 struct ldlm_glimpse_work *work;
718
719                 OBD_ALLOC_PTR(work);
720                 if (work == NULL) {
721                         CERROR("%s: failed to notify a lock.\n",
722                                qmt->qmt_svname);
723                         continue;
724                 }
725
726                 if (gld) {
727                         if (descs)
728                                 desc = &descs[i - 1];
729                         qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
730                         work->gl_interpret_data = lqe;
731                 }
732
733                 list_add_tail(&work->gl_list, &gl_list);
734                 work->gl_lock  = locks.q_locks[i - 1];
735                 work->gl_flags = 0;
736                 work->gl_desc  = desc;
737
738                 locks.q_locks[i - 1] = NULL;
739                 locks.q_cnt--;
740         }
741
742         qmt_free_lock_array(&locks);
743
744         if (list_empty(&gl_list)) {
745                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
746                 GOTO(out, rc = 0);
747         }
748
749         /* issue glimpse callbacks to all connected slaves */
750         rc = ldlm_glimpse_locks(res, &gl_list);
751
752         list_for_each_safe(pos, tmp, &gl_list) {
753                 struct ldlm_glimpse_work *work;
754
755                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
756
757                 list_del(&work->gl_list);
758                 CERROR("%s: failed to notify %s of new quota settings\n",
759                        qmt->qmt_svname,
760                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
761                 LDLM_LOCK_RELEASE(work->gl_lock);
762                 OBD_FREE_PTR(work);
763         }
764 out:
765         if (descs)
766                 OBD_FREE(descs,
767                          sizeof(struct ldlm_gl_lquota_desc) * locks_count);
768
769         RETURN(rc);
770 }
771
772 /*
773  * Send glimpse request to all global quota locks to push new quota setting to
774  * slaves.
775  *
776  * \param env - is the environment passed by the caller
777  * \param lqe - is the lquota entry which has new settings
778  * \param ver - is the version associated with the setting change
779  */
780 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
781                          __u64 ver)
782 {
783         struct qmt_thread_info  *qti = qmt_info(env);
784         struct qmt_pool_info    *pool = lqe2qpi(lqe);
785         struct ldlm_resource    *res = NULL;
786         ENTRY;
787
788         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
789
790         /* send glimpse callback to notify slaves of new quota settings */
791         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
792         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
793         if (lqe->lqe_is_default) {
794                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
795                 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
796                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
797                                                         LQUOTA_FLAG_DEFAULT);
798
799         } else if (lqe->lqe_is_deleted) {
800                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
801                 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
802                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
803                                                         LQUOTA_FLAG_DELETED);
804         } else {
805                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
806                 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
807                 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
808         }
809         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
810
811         /* look up ldlm resource associated with global index */
812         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
813         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
814                                 LDLM_PLAIN, 0);
815         if (IS_ERR(res)) {
816                 /* this might happen if no slaves have enqueued global quota
817                  * locks yet */
818                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
819                              "with "DFID, PFID(&qti->qti_fid));
820                 RETURN_EXIT;
821         }
822
823         qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
824                          NULL, NULL);
825         ldlm_resource_putref(res);
826         EXIT;
827 }
828
829 /* Callback function used to select locks that should be glimpsed when
830  * broadcasting the new qunit value */
831 static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
832 {
833         struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
834         struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
835         int idx;
836         int stype = qmt_uuid2idx(uuid, &idx);
837
838         LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
839
840         /* Quota pools support only OSTs, despite MDTs also could be registered
841          * as LQUOTA_RES_DT devices(DOM). */
842         if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
843                 return 1;
844         else
845                 return lgd->lqeg_arr[idx].lge_edquot_nu ||
846                        lgd->lqeg_arr[idx].lge_qunit_nu;
847 }
848
849
850 /*
851  * Send glimpse request on per-ID lock to push new qunit value to slave.
852  *
853  * \param env  - is the environment passed by the caller
854  * \param qmt  - is the quota master target device
855  * \param lqe  - is the lquota entry with the new qunit value
856  * \param uuid - is the uuid of the slave acquiring space, if any
857  */
858 static void qmt_id_lock_glimpse(const struct lu_env *env,
859                                 struct qmt_device *qmt,
860                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
861 {
862         struct qmt_thread_info  *qti = qmt_info(env);
863         struct qmt_pool_info    *pool = lqe2qpi(lqe);
864         struct ldlm_resource    *res = NULL;
865         ENTRY;
866
867         if (!lqe->lqe_enforced)
868                 RETURN_EXIT;
869
870         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
871         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
872         res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
873         if (IS_ERR(res)) {
874                 /* this might legitimately happens if slaves haven't had the
875                  * opportunity to enqueue quota lock yet. */
876                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
877                              "lock "DFID, PFID(&qti->qti_fid));
878                 lqe_write_lock(lqe);
879                 if (lqe->lqe_revoke_time == 0 &&
880                     lqe->lqe_qunit == pool->qpi_least_qunit)
881                         lqe->lqe_revoke_time = ktime_get_seconds();
882                 lqe_write_unlock(lqe);
883                 RETURN_EXIT;
884         }
885
886         lqe_write_lock(lqe);
887         /* The purpose of glimpse callback on per-ID lock is twofold:
888          * - notify slaves of new qunit value and hope they will release some
889          *   spare quota space in return
890          * - notify slaves that master ran out of quota space and there is no
891          *   need to send acquire request any more until further notice */
892
893         /* TODO: it is not clear how to implement below case for all lqes
894          * from where slaves will be notified in qmt_glimpse_lock. Because
895          * here we have just global lqe with an array of OSTs that should
896          * be notified. Theoretically we can find all lqes that includes
897          * these OSTs, but it is not trivial. So I would propose to move
898          * this case to another place ... */
899         if (lqe->lqe_revoke_time == 0 &&
900             lqe->lqe_qunit == pool->qpi_least_qunit)
901                 /* reset lqe_may_rel, it will be updated on glimpse callback
902                  * replies if needed */
903                 lqe->lqe_may_rel = 0;
904
905         /* The rebalance thread is the only thread which can issue glimpses */
906         LASSERT(!lqe->lqe_gl);
907         lqe->lqe_gl = true;
908         lqe_write_unlock(lqe);
909
910         /* issue glimpse callback to slaves */
911         qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
912                          qmt_id_lock_cb, lqe);
913
914         lqe_write_lock(lqe);
915         if (lqe->lqe_revoke_time == 0 &&
916             lqe->lqe_qunit == pool->qpi_least_qunit) {
917                 lqe->lqe_revoke_time = ktime_get_seconds();
918                 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
919         }
920         LASSERT(lqe->lqe_gl);
921         lqe->lqe_gl = false;
922         lqe_write_unlock(lqe);
923
924         ldlm_resource_putref(res);
925         EXIT;
926 }
927
928 /*
929  * Schedule a glimpse request on per-ID locks to push new qunit value or
930  * edquot flag to quota slaves.
931  *
932  * \param qmt  - is the quota master target device
933  * \param lqe  - is the lquota entry with the new qunit value
934  */
935 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
936 {
937         bool    added = false;
938         ENTRY;
939
940         LASSERT(lqe->lqe_is_global);
941         lqe_getref(lqe);
942         spin_lock(&qmt->qmt_reba_lock);
943         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
944                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
945                 added = true;
946                 if (qmt->qmt_reba_task)
947                         wake_up_process(qmt->qmt_reba_task);
948         }
949         spin_unlock(&qmt->qmt_reba_lock);
950
951         if (!added)
952                 lqe_putref(lqe);
953         EXIT;
954 }
955
956 struct qmt_reba_args {
957         struct qmt_device       *qra_dev;
958         struct lu_env            qra_env;
959         struct completion       *qra_started;
960 };
961
962 #ifndef TASK_IDLE
963 #define TASK_IDLE TASK_INTERRUPTIBLE
964 #endif
965
966 /*
967  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
968  * quota locks owned by slaves in order to notify them of:
969  * - a qunit shrink in which case slaves might release quota space back in
970  *   glimpse reply.
971  * - set/clear edquot flag used to cache the "quota exhausted" state of the
972  *   master. When the flag is set, slaves know that there is no need to
973  *   try to acquire quota from the master since this latter has already
974  *   distributed all the space.
975  */
976 static int qmt_reba_thread(void *_args)
977 {
978         struct qmt_reba_args    *args = _args;
979         struct qmt_device       *qmt = args->qra_dev;
980         struct lu_env           *env = &args->qra_env;
981         struct lquota_entry     *lqe, *tmp;
982         ENTRY;
983
984         complete(args->qra_started);
985         while (({set_current_state(TASK_IDLE);
986                  !kthread_should_stop(); })) {
987
988                 spin_lock(&qmt->qmt_reba_lock);
989                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
990                                          lqe_link) {
991                         __set_current_state(TASK_RUNNING);
992                         list_del_init(&lqe->lqe_link);
993                         spin_unlock(&qmt->qmt_reba_lock);
994
995                         if (!kthread_should_stop())
996                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
997
998                         lqe_putref(lqe);
999                         spin_lock(&qmt->qmt_reba_lock);
1000                 }
1001                 spin_unlock(&qmt->qmt_reba_lock);
1002                 schedule();
1003         }
1004         __set_current_state(TASK_RUNNING);
1005
1006         lu_env_remove(env);
1007         lu_env_fini(env);
1008         OBD_FREE_PTR(args);
1009         RETURN(0);
1010 }
1011
1012 /*
1013  * Start rebalance thread. Called when the QMT is being setup
1014  */
1015 int qmt_start_reba_thread(struct qmt_device *qmt)
1016 {
1017         struct task_struct *task;
1018         struct qmt_reba_args *args;
1019         DECLARE_COMPLETION_ONSTACK(started);
1020         int rc;
1021         ENTRY;
1022
1023         OBD_ALLOC_PTR(args);
1024         if (args == NULL)
1025                 RETURN(-ENOMEM);
1026         args->qra_dev = qmt;
1027         args->qra_started = &started;
1028
1029         rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
1030         if (rc) {
1031                 CERROR("%s: failed to init env.\n", qmt->qmt_svname);
1032                 GOTO(out_env, rc);
1033         }
1034
1035         task = kthread_create(qmt_reba_thread, args,
1036                               "qmt_reba_%s", qmt->qmt_svname);
1037         if (IS_ERR(task)) {
1038                 CERROR("%s: failed to start rebalance thread (%ld)\n",
1039                        qmt->qmt_svname, PTR_ERR(task));
1040                 GOTO(out_env_fini, rc = PTR_ERR(task));
1041         }
1042
1043         rc = lu_env_add_task(&args->qra_env, task);
1044         if (rc) {
1045                 kthread_stop(task);
1046                 GOTO(out_env_fini, rc);
1047         }
1048         qmt->qmt_reba_task = task;
1049         wake_up_process(task);
1050         wait_for_completion(&started);
1051
1052         RETURN(0);
1053 out_env_fini:
1054         lu_env_fini(&args->qra_env);
1055 out_env:
1056         OBD_FREE_PTR(args);
1057         RETURN(rc);
1058 }
1059
1060 /*
1061  * Stop rebalance thread. Called when the QMT is about to shutdown.
1062  */
1063 void qmt_stop_reba_thread(struct qmt_device *qmt)
1064 {
1065         struct task_struct *task;
1066
1067         spin_lock(&qmt->qmt_reba_lock);
1068         task = qmt->qmt_reba_task;
1069         qmt->qmt_reba_task = NULL;
1070         spin_unlock(&qmt->qmt_reba_lock);
1071
1072         if (task)
1073                 kthread_stop(task);
1074
1075         LASSERT(list_empty(&qmt->qmt_reba_list));
1076 }