Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         struct ldlm_reply       *ldlm_rep;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
83         if (ldlm_rep == NULL)
84                 RETURN(err_serious(-EFAULT));
85
86         uuid = &(*lockp)->l_export->exp_client_uuid;
87         switch (it->opc) {
88
89         case IT_QUOTA_DQACQ: {
90                 struct lquota_entry     *lqe;
91                 struct ldlm_lock        *lock;
92
93                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
94                         /* acquire on global lock? something is wrong ... */
95                         GOTO(out, rc = -EPROTO);
96
97                 /* verify global lock isn't stale */
98                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
99                         GOTO(out, rc = -ENOLCK);
100
101                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
102                 if (lock == NULL)
103                         GOTO(out, rc = -ENOLCK);
104                 LDLM_LOCK_PUT(lock);
105
106                 lqe = res->lr_lvb_data;
107                 LASSERT(lqe != NULL);
108                 lqe_getref(lqe);
109
110                 /* acquire quota space */
111                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
112                                 reqbody->qb_count, reqbody->qb_usage,
113                                 repbody);
114                 lqe_putref(lqe);
115                 if (rc)
116                         GOTO(out, rc);
117                 break;
118         }
119
120         case IT_QUOTA_CONN:
121                 /* new connection from slave */
122
123                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
124                         /* connection on per-ID lock? something is wrong ... */
125                         GOTO(out, rc = -EPROTO);
126
127                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
128                                        &repbody->qb_slv_fid,
129                                        &repbody->qb_slv_ver, uuid);
130                 if (rc)
131                         GOTO(out, rc);
132                 break;
133
134         default:
135                 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
136                        it->opc);
137                 GOTO(out, rc = -EINVAL);
138         }
139
140         /* on success, pack lvb in reply */
141         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142         lvb_len = ldlm_lvbo_size(*lockp);
143         lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
144         if (lvb_len < 0)
145                 GOTO(out, rc = lvb_len);
146
147         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
148 out:
149         ldlm_rep->lock_policy_res2 = clear_serious(rc);
150         EXIT;
151         return ELDLM_OK;
152 }
153
154 /*
155  * Initialize quota LVB associated with quota indexes.
156  * Called with res->lr_lvb_sem held
157  */
158 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
159 {
160         struct lu_env           *env;
161         struct qmt_thread_info  *qti;
162         struct qmt_device       *qmt = lu2qmt_dev(ld);
163         int                      pool_type, qtype;
164         int                      rc;
165         ENTRY;
166
167         LASSERT(res != NULL);
168
169         if (res->lr_type != LDLM_PLAIN)
170                 RETURN(-ENOTSUPP);
171
172         if (res->lr_lvb_data ||
173             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
174                 RETURN(0);
175
176         env = lu_env_find();
177         LASSERT(env);
178         qti = qmt_info(env);
179
180         /* extract global index FID and quota identifier */
181         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
182
183         /* sanity check the global index FID */
184         rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
185         if (rc) {
186                 CERROR("can't extract glb index information from FID "DFID"\n",
187                        PFID(&qti->qti_fid));
188                 GOTO(out, rc);
189         }
190
191         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
192                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
193                  * we are thus dealing with an ID lock. */
194                 struct lquota_entry     *lqe;
195
196                 /* Find the quota entry associated with the quota id */
197                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
198                                           &qti->qti_id);
199                 if (IS_ERR(lqe))
200                         GOTO(out, rc = PTR_ERR(lqe));
201
202                 /* store reference to lqe in lr_lvb_data */
203                 res->lr_lvb_data = lqe;
204                 LQUOTA_DEBUG(lqe, "initialized res lvb");
205         } else {
206                 struct dt_object        *obj;
207
208                 /* lookup global index */
209                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
210                 if (IS_ERR(obj))
211                         GOTO(out, rc = PTR_ERR(obj));
212                 if (!dt_object_exists(obj)) {
213                         dt_object_put(env, obj);
214                         GOTO(out, rc = -ENOENT);
215                 }
216
217                 /* store reference to global index object in lr_lvb_data */
218                 res->lr_lvb_data = obj;
219                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
220         }
221
222         res->lr_lvb_len = sizeof(struct lquota_lvb);
223         EXIT;
224 out:
225         return rc;
226 }
227
228 /*
229  * Update LVB associated with the global quota index.
230  * This function is called from the DLM itself after a glimpse callback, in this
231  * case valid ptlrpc request is passed.
232  */
233 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
234                     struct ptlrpc_request *req, int increase_only)
235 {
236         struct lu_env           *env;
237         struct qmt_thread_info  *qti;
238         struct qmt_device       *qmt = lu2qmt_dev(ld);
239         struct lquota_entry     *lqe;
240         struct lquota_lvb       *lvb;
241         struct ldlm_lock        *lock;
242         struct obd_export       *exp;
243         int                      rc = 0;
244         ENTRY;
245
246         LASSERT(res != NULL);
247
248         if (req == NULL)
249                 RETURN(0);
250
251         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
252                 /* no need to update lvb for global quota locks */
253                 RETURN(0);
254
255         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
256                                           lustre_swab_lquota_lvb);
257         if (lvb == NULL) {
258                 CERROR("%s: failed to extract lvb from request\n",
259                        qmt->qmt_svname);
260                 RETURN(-EFAULT);
261         }
262
263         lqe = res->lr_lvb_data;
264         LASSERT(lqe != NULL);
265         lqe_getref(lqe);
266
267         LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
268                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
269
270         if (lvb->lvb_id_rel == 0) {
271                 /* nothing to release */
272                 if (lvb->lvb_id_may_rel != 0)
273                         /* but might still release later ... */
274                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
275                 GOTO(out, rc = 0);
276         }
277
278         /* allocate environement */
279         env = lu_env_find();
280         LASSERT(env);
281         qti = qmt_info(env);
282
283         /* The request is a glimpse callback which was sent via the
284          * reverse import to the slave. What we care about here is the
285          * export associated with the slave and req->rq_export is
286          * definitely not what we are looking for (it is actually set to
287          * NULL here).
288          * Therefore we extract the lock from the request argument
289          * and use lock->l_export. */
290         lock = ldlm_request_lock(req);
291         if (IS_ERR(lock)) {
292                 CERROR("%s: failed to get lock from request!\n",
293                        qmt->qmt_svname);
294                 GOTO(out, rc = PTR_ERR(lock));
295         }
296
297         exp = class_export_get(lock->l_export);
298         if (exp == NULL) {
299                 CERROR("%s: failed to get export from lock!\n",
300                        qmt->qmt_svname);
301                 GOTO(out, rc = -EFAULT);
302         }
303
304         /* release quota space */
305         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
306                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
307         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
308                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
309                              "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
310                              lvb->lvb_id_rel, rc);
311         class_export_put(exp);
312         if (rc)
313                 GOTO(out, rc);
314         EXIT;
315 out:
316         lqe_putref(lqe);
317         return rc;
318 }
319
320 /*
321  * Report size of lvb to ldlm layer in order to allocate lvb buffer
322  * As far as quota locks are concerned, the size is static and is the same
323  * for both global and per-ID locks which shares the same lvb format.
324  */
325 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
326 {
327         return sizeof(struct lquota_lvb);
328 }
329
330 /*
331  * Fill request buffer with quota lvb
332  */
333 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
334                   int lvblen)
335 {
336         struct ldlm_resource    *res = lock->l_resource;
337         struct lquota_lvb       *qlvb = lvb;
338         ENTRY;
339
340         LASSERT(res != NULL);
341
342         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
343             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
344                 RETURN(-EINVAL);
345
346         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
347                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
348                  * we are thus dealing with an ID lock. */
349                 struct lquota_entry     *lqe = res->lr_lvb_data;
350
351                 /* return current qunit value & edquot flags in lvb */
352                 lqe_getref(lqe);
353                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
354                 qlvb->lvb_flags = 0;
355                 if (lqe->lqe_edquot)
356                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
357                 lqe_putref(lqe);
358         } else {
359                 /* global quota lock */
360                 struct dt_object *obj = res->lr_lvb_data;
361
362                 /* return current version of global index */
363                 qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj);
364         }
365
366         RETURN(sizeof(struct lquota_lvb));
367 }
368
369 /*
370  * Free lvb associated with a given ldlm resource
371  * we don't really allocate a lvb, lr_lvb_data just points to
372  * the appropriate backend structures.
373  */
374 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
375 {
376         ENTRY;
377
378         if (res->lr_lvb_data == NULL)
379                 RETURN(0);
380
381         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
382                 struct lquota_entry     *lqe = res->lr_lvb_data;
383
384                 /* release lqe reference */
385                 lqe_putref(lqe);
386         } else {
387                 struct dt_object *obj = res->lr_lvb_data;
388                 /* release object reference */
389                 dt_object_put(lu_env_find(), obj);
390         }
391
392         res->lr_lvb_data = NULL;
393         res->lr_lvb_len  = 0;
394
395         RETURN(0);
396 }
397
398 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
399
400 struct qmt_gl_lock_array {
401         unsigned long             q_max;
402         unsigned long             q_cnt;
403         struct ldlm_lock        **q_locks;
404 };
405
406 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
407 {
408         int i;
409
410         if (array->q_max == 0) {
411                 LASSERT(array->q_locks == NULL);
412                 return;
413         }
414
415         for (i = 0; i < array->q_cnt; i++) {
416                 LASSERT(array->q_locks[i]);
417                 LDLM_LOCK_RELEASE(array->q_locks[i]);
418                 array->q_locks[i] = NULL;
419         }
420         array->q_cnt = 0;
421         OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
422         array->q_locks = NULL;
423         array->q_max = 0;
424 }
425
426 static int qmt_alloc_lock_array(struct ldlm_resource *res,
427                                 struct qmt_gl_lock_array *array,
428                                 qmt_glimpse_cb_t cb, void *arg)
429 {
430         struct list_head *pos;
431         unsigned long count = 0;
432         int fail_cnt = 0;
433         ENTRY;
434
435         LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
436 again:
437         lock_res(res);
438         /* scan list of granted locks */
439         list_for_each(pos, &res->lr_granted) {
440                 struct ldlm_lock *lock;
441                 int rc;
442
443                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
444                 LASSERT(lock->l_export);
445
446                 if (cb != NULL) {
447                         rc = cb(lock, arg);
448                         /* slave should not be notified */
449                         if (rc == 0)
450                                 continue;
451                 }
452
453                 count++;
454                 if (array->q_max != 0 && array->q_cnt < array->q_max) {
455                         array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
456                         array->q_cnt++;
457                 }
458         }
459         unlock_res(res);
460
461         if (count > array->q_max) {
462                 qmt_free_lock_array(array);
463                 if (++fail_cnt > 5)
464                         RETURN(-EAGAIN);
465                 /*
466                  * allocate more slots in case of more qualified locks are
467                  * found during next loop
468                  */
469                 array->q_max = count + count / 2 + 10;
470                 count = 0;
471                 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
472                 OBD_ALLOC(array->q_locks,
473                           sizeof(*array->q_locks) * array->q_max);
474                 if (array->q_locks == NULL) {
475                         array->q_max = 0;
476                         RETURN(-ENOMEM);
477                 }
478
479                 goto again;
480         }
481         RETURN(0);
482 }
483
484 /*
485  * Send glimpse callback to slaves holding a lock on resource \res.
486  * This is used to notify slaves of new quota settings or to claim quota space
487  * back.
488  *
489  * \param env  - is the environment passed by the caller
490  * \param qmt  - is the quota master target
491  * \param res  - is the dlm resource associated with the quota object
492  * \param desc - is the glimpse descriptor to pack in glimpse callback
493  * \param cb   - is the callback function called on every lock and determine
494  *               whether a glimpse should be issued
495  * \param arg  - is an opaq parameter passed to the callback function
496  */
497 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
498                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
499                             qmt_glimpse_cb_t cb, void *arg)
500 {
501         struct list_head *tmp, *pos;
502         struct list_head gl_list = LIST_HEAD_INIT(gl_list);
503         struct qmt_gl_lock_array locks;
504         unsigned long i;
505         int rc = 0;
506         ENTRY;
507
508         memset(&locks, 0, sizeof(locks));
509         rc = qmt_alloc_lock_array(res, &locks, cb, arg);
510         if (rc) {
511                 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
512                        qmt->qmt_svname, rc);
513                 RETURN(rc);
514         }
515
516         for (i = locks.q_cnt; i > 0; i--) {
517                 struct ldlm_glimpse_work *work;
518
519                 OBD_ALLOC_PTR(work);
520                 if (work == NULL) {
521                         CERROR("%s: failed to notify a lock.\n",
522                                qmt->qmt_svname);
523                         continue;
524                 }
525
526                 list_add_tail(&work->gl_list, &gl_list);
527                 work->gl_lock  = locks.q_locks[i - 1];
528                 work->gl_flags = 0;
529                 work->gl_desc  = desc;
530
531                 locks.q_locks[i - 1] = NULL;
532                 locks.q_cnt--;
533         }
534
535         qmt_free_lock_array(&locks);
536
537         if (list_empty(&gl_list)) {
538                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
539                 RETURN(0);
540         }
541
542         /* issue glimpse callbacks to all connected slaves */
543         rc = ldlm_glimpse_locks(res, &gl_list);
544
545         list_for_each_safe(pos, tmp, &gl_list) {
546                 struct ldlm_glimpse_work *work;
547
548                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
549
550                 list_del(&work->gl_list);
551                 CERROR("%s: failed to notify %s of new quota settings\n",
552                        qmt->qmt_svname,
553                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
554                 LDLM_LOCK_RELEASE(work->gl_lock);
555                 OBD_FREE_PTR(work);
556         }
557
558         RETURN(rc);
559 }
560
561 /*
562  * Send glimpse request to all global quota locks to push new quota setting to
563  * slaves.
564  *
565  * \param env - is the environment passed by the caller
566  * \param lqe - is the lquota entry which has new settings
567  * \param ver - is the version associated with the setting change
568  */
569 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
570                          __u64 ver)
571 {
572         struct qmt_thread_info  *qti = qmt_info(env);
573         struct qmt_pool_info    *pool = lqe2qpi(lqe);
574         struct ldlm_resource    *res = NULL;
575         ENTRY;
576
577         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
578                             lqe->lqe_site->lqs_qtype);
579
580         /* send glimpse callback to notify slaves of new quota settings */
581         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
582         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
583         if (lqe->lqe_is_default) {
584                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
585                 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
586                 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
587                                                         LQUOTA_FLAG_DEFAULT);
588
589         } else {
590                 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
591                 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
592                 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
593         }
594         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
595
596         /* look up ldlm resource associated with global index */
597         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
598         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
599                                 LDLM_PLAIN, 0);
600         if (IS_ERR(res)) {
601                 /* this might happen if no slaves have enqueued global quota
602                  * locks yet */
603                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
604                              "with "DFID, PFID(&qti->qti_fid));
605                 RETURN_EXIT;
606         }
607
608         qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
609                          NULL, NULL);
610         ldlm_resource_putref(res);
611         EXIT;
612 }
613
614 /* Callback function used to select locks that should be glimpsed when
615  * broadcasting the new qunit value */
616 static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
617 {
618         struct obd_uuid *slv_uuid = arg;
619         struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
620
621         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
622                 RETURN(0);
623         RETURN(+1);
624 }
625
626 /*
627  * Send glimpse request on per-ID lock to push new qunit value to slave.
628  *
629  * \param env  - is the environment passed by the caller
630  * \param qmt  - is the quota master target device
631  * \param lqe  - is the lquota entry with the new qunit value
632  * \param uuid - is the uuid of the slave acquiring space, if any
633  */
634 static void qmt_id_lock_glimpse(const struct lu_env *env,
635                                 struct qmt_device *qmt,
636                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
637 {
638         struct qmt_thread_info  *qti = qmt_info(env);
639         struct qmt_pool_info    *pool = lqe2qpi(lqe);
640         struct ldlm_resource    *res = NULL;
641         ENTRY;
642
643         if (!lqe->lqe_enforced)
644                 RETURN_EXIT;
645
646         lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
647                             lqe->lqe_site->lqs_qtype);
648         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
649         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
650                                 0);
651         if (IS_ERR(res)) {
652                 /* this might legitimately happens if slaves haven't had the
653                  * opportunity to enqueue quota lock yet. */
654                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
655                              "lock "DFID, PFID(&qti->qti_fid));
656                 lqe_write_lock(lqe);
657                 if (lqe->lqe_revoke_time == 0 &&
658                     lqe->lqe_qunit == pool->qpi_least_qunit)
659                         lqe->lqe_revoke_time = ktime_get_seconds();
660                 lqe_write_unlock(lqe);
661                 RETURN_EXIT;
662         }
663
664         lqe_write_lock(lqe);
665         /* The purpose of glimpse callback on per-ID lock is twofold:
666          * - notify slaves of new qunit value and hope they will release some
667          *   spare quota space in return
668          * - notify slaves that master ran out of quota space and there is no
669          *   need to send acquire request any more until further notice */
670
671         /* fill glimpse descriptor with lqe settings */
672         if (lqe->lqe_edquot)
673                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
674         else
675                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
676         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
677
678         if (lqe->lqe_revoke_time == 0 &&
679             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
680                 /* reset lqe_may_rel, it will be updated on glimpse callback
681                  * replies if needed */
682                 lqe->lqe_may_rel = 0;
683
684         /* The rebalance thread is the only thread which can issue glimpses */
685         LASSERT(!lqe->lqe_gl);
686         lqe->lqe_gl = true;
687         lqe_write_unlock(lqe);
688
689         /* issue glimpse callback to slaves */
690         qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
691                          uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
692
693         lqe_write_lock(lqe);
694         if (lqe->lqe_revoke_time == 0 &&
695             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
696             lqe->lqe_qunit == pool->qpi_least_qunit) {
697                 lqe->lqe_revoke_time = ktime_get_seconds();
698                 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
699         }
700         LASSERT(lqe->lqe_gl);
701         lqe->lqe_gl = false;
702         lqe_write_unlock(lqe);
703
704         ldlm_resource_putref(res);
705         EXIT;
706 }
707
708 /*
709  * Schedule a glimpse request on per-ID locks to push new qunit value or
710  * edquot flag to quota slaves.
711  *
712  * \param qmt  - is the quota master target device
713  * \param lqe  - is the lquota entry with the new qunit value
714  */
715 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
716 {
717         bool    added = false;
718         ENTRY;
719
720         lqe_getref(lqe);
721         spin_lock(&qmt->qmt_reba_lock);
722         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
723                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
724                 added = true;
725         }
726         spin_unlock(&qmt->qmt_reba_lock);
727
728         if (added)
729                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
730         else
731                 lqe_putref(lqe);
732         EXIT;
733 }
734
735 /*
736  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
737  * quota locks owned by slaves in order to notify them of:
738  * - a qunit shrink in which case slaves might release quota space back in
739  *   glimpse reply.
740  * - set/clear edquot flag used to cache the "quota exhausted" state of the
741  *   master. When the flag is set, slaves know that there is no need to
742  *   try to acquire quota from the master since this latter has already
743  *   distributed all the space.
744  */
745 static int qmt_reba_thread(void *arg)
746 {
747         struct qmt_device       *qmt = (struct qmt_device *)arg;
748         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
749         struct l_wait_info       lwi = { 0 };
750         struct lu_env           *env;
751         struct lquota_entry     *lqe, *tmp;
752         int                      rc;
753         ENTRY;
754
755         OBD_ALLOC_PTR(env);
756         if (env == NULL) {
757                 thread_set_flags(thread, SVC_STOPPED);
758                 RETURN(-ENOMEM);
759         }
760
761         rc = lu_env_init(env, LCT_MD_THREAD);
762         if (rc) {
763                 CERROR("%s: failed to init env.", qmt->qmt_svname);
764                 GOTO(out_env, rc);
765         }
766         rc = lu_env_add(env);
767         if (rc)
768                 GOTO(out_env_fini, rc);
769
770         thread_set_flags(thread, SVC_RUNNING);
771         wake_up(&thread->t_ctl_waitq);
772
773         while (1) {
774                 l_wait_event(thread->t_ctl_waitq,
775                              !list_empty(&qmt->qmt_reba_list) ||
776                              !thread_is_running(thread), &lwi);
777
778                 spin_lock(&qmt->qmt_reba_lock);
779                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
780                                          lqe_link) {
781                         list_del_init(&lqe->lqe_link);
782                         spin_unlock(&qmt->qmt_reba_lock);
783
784                         if (thread_is_running(thread))
785                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
786
787                         lqe_putref(lqe);
788                         spin_lock(&qmt->qmt_reba_lock);
789                 }
790                 spin_unlock(&qmt->qmt_reba_lock);
791
792                 if (!thread_is_running(thread))
793                         break;
794         }
795         lu_env_remove(env);
796 out_env_fini:
797         lu_env_fini(env);
798 out_env:
799         OBD_FREE_PTR(env);
800         thread_set_flags(thread, SVC_STOPPED);
801         wake_up(&thread->t_ctl_waitq);
802         RETURN(rc);
803 }
804
805 /*
806  * Start rebalance thread. Called when the QMT is being setup
807  */
808 int qmt_start_reba_thread(struct qmt_device *qmt)
809 {
810         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
811         struct l_wait_info       lwi    = { 0 };
812         struct task_struct              *task;
813         ENTRY;
814
815         task = kthread_run(qmt_reba_thread, (void *)qmt,
816                                "qmt_reba_%s", qmt->qmt_svname);
817         if (IS_ERR(task)) {
818                 CERROR("%s: failed to start rebalance thread (%ld)\n",
819                        qmt->qmt_svname, PTR_ERR(task));
820                 thread_set_flags(thread, SVC_STOPPED);
821                 RETURN(PTR_ERR(task));
822         }
823
824         l_wait_event(thread->t_ctl_waitq,
825                      thread_is_running(thread) || thread_is_stopped(thread),
826                      &lwi);
827
828         RETURN(0);
829 }
830
831 /*
832  * Stop rebalance thread. Called when the QMT is about to shutdown.
833  */
834 void qmt_stop_reba_thread(struct qmt_device *qmt)
835 {
836         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
837
838         if (!thread_is_stopped(thread)) {
839                 struct l_wait_info lwi = { 0 };
840
841                 thread_set_flags(thread, SVC_STOPPING);
842                 wake_up(&thread->t_ctl_waitq);
843
844                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
845                              &lwi);
846         }
847         LASSERT(list_empty(&qmt->qmt_reba_list));
848 }