Whamcloud - gitweb
LU-10308 misc: update Intel copyright messages for 2017
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
38
39 #include "qmt_internal.h"
40
41 /* intent policy function called from mdt_intent_opc() when the intent is of
42  * quota type */
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
45                       int flags)
46 {
47         struct qmt_device       *qmt = lu2qmt_dev(ld);
48         struct ldlm_intent      *it;
49         struct quota_body       *reqbody;
50         struct quota_body       *repbody;
51         struct obd_uuid         *uuid;
52         struct lquota_lvb       *lvb;
53         struct ldlm_resource    *res = (*lockp)->l_resource;
54         struct ldlm_reply       *ldlm_rep;
55         int                      rc, lvb_len;
56         ENTRY;
57
58         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60                              ldlm_lvbo_size(*lockp));
61
62         /* extract quota body and intent opc */
63         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
64         if (it == NULL)
65                 RETURN(err_serious(-EFAULT));
66
67         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (reqbody == NULL)
69                 RETURN(err_serious(-EFAULT));
70
71         /* prepare reply */
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc != 0) {
74                 CERROR("Can't pack response, rc %d\n", rc);
75                 RETURN(err_serious(rc));
76         }
77
78         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
79         if (repbody == NULL)
80                 RETURN(err_serious(-EFAULT));
81
82         ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
83         if (ldlm_rep == NULL)
84                 RETURN(err_serious(-EFAULT));
85
86         uuid = &(*lockp)->l_export->exp_client_uuid;
87         switch (it->opc) {
88
89         case IT_QUOTA_DQACQ: {
90                 struct lquota_entry     *lqe;
91                 struct ldlm_lock        *lock;
92
93                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
94                         /* acquire on global lock? something is wrong ... */
95                         GOTO(out, rc = -EPROTO);
96
97                 /* verify global lock isn't stale */
98                 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
99                         GOTO(out, rc = -ENOLCK);
100
101                 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
102                 if (lock == NULL)
103                         GOTO(out, rc = -ENOLCK);
104                 LDLM_LOCK_PUT(lock);
105
106                 lqe = res->lr_lvb_data;
107                 LASSERT(lqe != NULL);
108                 lqe_getref(lqe);
109
110                 /* acquire quota space */
111                 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
112                                 reqbody->qb_count, reqbody->qb_usage,
113                                 repbody);
114                 lqe_putref(lqe);
115                 if (rc)
116                         GOTO(out, rc);
117                 break;
118         }
119
120         case IT_QUOTA_CONN:
121                 /* new connection from slave */
122
123                 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
124                         /* connection on per-ID lock? something is wrong ... */
125                         GOTO(out, rc = -EPROTO);
126
127                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
128                                        &repbody->qb_slv_fid,
129                                        &repbody->qb_slv_ver, uuid);
130                 if (rc)
131                         GOTO(out, rc);
132                 break;
133
134         default:
135                 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
136                        it->opc);
137                 GOTO(out, rc = -EINVAL);
138         }
139
140         /* on success, pack lvb in reply */
141         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142         lvb_len = ldlm_lvbo_size(*lockp);
143         lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
144         if (lvb_len < 0)
145                 GOTO(out, rc = lvb_len);
146
147         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
148 out:
149         ldlm_rep->lock_policy_res2 = clear_serious(rc);
150         EXIT;
151         return ELDLM_OK;
152 }
153
154 /*
155  * Initialize quota LVB associated with quota indexes.
156  * Called with res->lr_lvb_sem held
157  */
158 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
159 {
160         struct lu_env           *env;
161         struct qmt_thread_info  *qti;
162         struct qmt_device       *qmt = lu2qmt_dev(ld);
163         int                      pool_id, pool_type, qtype;
164         int                      rc;
165         ENTRY;
166
167         LASSERT(res != NULL);
168
169         if (res->lr_type != LDLM_PLAIN)
170                 RETURN(-ENOTSUPP);
171
172         if (res->lr_lvb_data ||
173             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
174                 RETURN(0);
175
176         OBD_ALLOC_PTR(env);
177         if (env == NULL)
178                 RETURN(-ENOMEM);
179
180         /* initialize environment */
181         rc = lu_env_init(env, LCT_MD_THREAD);
182         if (rc != 0)
183                 GOTO(out_free, rc);
184         qti = qmt_info(env);
185
186         /* extract global index FID and quota identifier */
187         fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
188
189         /* sanity check the global index FID */
190         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
191         if (rc) {
192                 CERROR("can't extract pool information from FID "DFID"\n",
193                        PFID(&qti->qti_fid));
194                 GOTO(out, rc);
195         }
196
197         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
198                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
199                  * we are thus dealing with an ID lock. */
200                 struct lquota_entry     *lqe;
201
202                 /* Find the quota entry associated with the quota id */
203                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
204                                           &qti->qti_id);
205                 if (IS_ERR(lqe))
206                         GOTO(out, rc = PTR_ERR(lqe));
207
208                 /* store reference to lqe in lr_lvb_data */
209                 res->lr_lvb_data = lqe;
210                 LQUOTA_DEBUG(lqe, "initialized res lvb");
211         } else {
212                 struct dt_object        *obj;
213
214                 /* lookup global index */
215                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
216                 if (IS_ERR(obj))
217                         GOTO(out, rc = PTR_ERR(obj));
218                 if (!dt_object_exists(obj)) {
219                         dt_object_put(env, obj);
220                         GOTO(out, rc = -ENOENT);
221                 }
222
223                 /* store reference to global index object in lr_lvb_data */
224                 res->lr_lvb_data = obj;
225                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
226         }
227
228         res->lr_lvb_len = sizeof(struct lquota_lvb);
229         EXIT;
230 out:
231         lu_env_fini(env);
232 out_free:
233         OBD_FREE_PTR(env);
234         return rc;
235 }
236
237 /*
238  * Update LVB associated with the global quota index.
239  * This function is called from the DLM itself after a glimpse callback, in this
240  * case valid ptlrpc request is passed.
241  */
242 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
243                     struct ptlrpc_request *req, int increase_only)
244 {
245         struct lu_env           *env;
246         struct qmt_thread_info  *qti;
247         struct qmt_device       *qmt = lu2qmt_dev(ld);
248         struct lquota_entry     *lqe;
249         struct lquota_lvb       *lvb;
250         struct ldlm_lock        *lock;
251         struct obd_export       *exp;
252         int                      rc = 0;
253         ENTRY;
254
255         LASSERT(res != NULL);
256
257         if (req == NULL)
258                 RETURN(0);
259
260         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
261                 /* no need to update lvb for global quota locks */
262                 RETURN(0);
263
264         lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
265                                           lustre_swab_lquota_lvb);
266         if (lvb == NULL) {
267                 CERROR("%s: failed to extract lvb from request\n",
268                        qmt->qmt_svname);
269                 RETURN(-EFAULT);
270         }
271
272         lqe = res->lr_lvb_data;
273         LASSERT(lqe != NULL);
274         lqe_getref(lqe);
275
276         LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
277                      lvb->lvb_id_rel, lvb->lvb_id_may_rel);
278
279         if (lvb->lvb_id_rel == 0) {
280                 /* nothing to release */
281                 if (lvb->lvb_id_may_rel != 0)
282                         /* but might still release later ... */
283                         lqe->lqe_may_rel += lvb->lvb_id_may_rel;
284                 GOTO(out_lqe, rc = 0);
285         }
286
287         /* allocate environement */
288         OBD_ALLOC_PTR(env);
289         if (env == NULL)
290                 GOTO(out_lqe, rc = -ENOMEM);
291
292         /* initialize environment */
293         rc = lu_env_init(env, LCT_MD_THREAD);
294         if (rc)
295                 GOTO(out_env, rc);
296         qti = qmt_info(env);
297
298         /* The request is a glimpse callback which was sent via the
299          * reverse import to the slave. What we care about here is the
300          * export associated with the slave and req->rq_export is
301          * definitely not what we are looking for (it is actually set to
302          * NULL here).
303          * Therefore we extract the lock from the request argument
304          * and use lock->l_export. */
305         lock = ldlm_request_lock(req);
306         if (IS_ERR(lock)) {
307                 CERROR("%s: failed to get lock from request!\n",
308                        qmt->qmt_svname);
309                 GOTO(out_env_init, rc = PTR_ERR(lock));
310         }
311
312         exp = class_export_get(lock->l_export);
313         if (exp == NULL) {
314                 CERROR("%s: failed to get export from lock!\n",
315                        qmt->qmt_svname);
316                 GOTO(out_env_init, rc = -EFAULT);
317         }
318
319         /* release quota space */
320         rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
321                         QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
322         if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
323                 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
324                              "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
325                              lvb->lvb_id_rel, rc);
326         class_export_put(exp);
327         if (rc)
328                 GOTO(out_env_init, rc);
329         EXIT;
330 out_env_init:
331         lu_env_fini(env);
332 out_env:
333         OBD_FREE_PTR(env);
334 out_lqe:
335         lqe_putref(lqe);
336         return rc;
337 }
338
339 /*
340  * Report size of lvb to ldlm layer in order to allocate lvb buffer
341  * As far as quota locks are concerned, the size is static and is the same
342  * for both global and per-ID locks which shares the same lvb format.
343  */
344 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
345 {
346         return sizeof(struct lquota_lvb);
347 }
348
349 /*
350  * Fill request buffer with quota lvb
351  */
352 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
353                   int lvblen)
354 {
355         struct ldlm_resource    *res = lock->l_resource;
356         struct lquota_lvb       *qlvb = lvb;
357         ENTRY;
358
359         LASSERT(res != NULL);
360
361         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
362             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
363                 RETURN(-EINVAL);
364
365         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
366                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
367                  * we are thus dealing with an ID lock. */
368                 struct lquota_entry     *lqe = res->lr_lvb_data;
369
370                 /* return current qunit value & edquot flags in lvb */
371                 lqe_getref(lqe);
372                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
373                 qlvb->lvb_flags = 0;
374                 if (lqe->lqe_edquot)
375                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
376                 lqe_putref(lqe);
377         } else {
378                 /* global quota lock */
379                 struct lu_env           *env;
380                 int                      rc;
381                 struct dt_object        *obj = res->lr_lvb_data;
382
383                 OBD_ALLOC_PTR(env);
384                 if (env == NULL)
385                         RETURN(-ENOMEM);
386
387                 /* initialize environment */
388                 rc = lu_env_init(env, LCT_LOCAL);
389                 if (rc) {
390                         OBD_FREE_PTR(env);
391                         RETURN(rc);
392                 }
393
394                 /* return current version of global index */
395                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
396
397                 lu_env_fini(env);
398                 OBD_FREE_PTR(env);
399         }
400
401         RETURN(sizeof(struct lquota_lvb));
402 }
403
404 /*
405  * Free lvb associated with a given ldlm resource
406  * we don't really allocate a lvb, lr_lvb_data just points to
407  * the appropriate backend structures.
408  */
409 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
410 {
411         ENTRY;
412
413         if (res->lr_lvb_data == NULL)
414                 RETURN(0);
415
416         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
417                 struct lquota_entry     *lqe = res->lr_lvb_data;
418
419                 /* release lqe reference */
420                 lqe_putref(lqe);
421         } else {
422                 struct dt_object        *obj = res->lr_lvb_data;
423                 struct lu_env           *env;
424                 int                      rc;
425
426                 OBD_ALLOC_PTR(env);
427                 if (env == NULL)
428                         RETURN(-ENOMEM);
429
430                 /* initialize environment */
431                 rc = lu_env_init(env, LCT_LOCAL);
432                 if (rc) {
433                         OBD_FREE_PTR(env);
434                         RETURN(rc);
435                 }
436
437                 /* release object reference */
438                 dt_object_put(env, obj);
439                 lu_env_fini(env);
440                 OBD_FREE_PTR(env);
441         }
442
443         res->lr_lvb_data = NULL;
444         res->lr_lvb_len  = 0;
445
446         RETURN(0);
447 }
448
449 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
450
451 struct qmt_gl_lock_array {
452         unsigned long             q_max;
453         unsigned long             q_cnt;
454         struct ldlm_lock        **q_locks;
455 };
456
457 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
458 {
459         int i;
460
461         if (array->q_max == 0) {
462                 LASSERT(array->q_locks == NULL);
463                 return;
464         }
465
466         for (i = 0; i < array->q_cnt; i++) {
467                 LASSERT(array->q_locks[i]);
468                 LDLM_LOCK_RELEASE(array->q_locks[i]);
469                 array->q_locks[i] = NULL;
470         }
471         array->q_cnt = 0;
472         OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
473         array->q_locks = NULL;
474         array->q_max = 0;
475 }
476
477 static int qmt_alloc_lock_array(struct ldlm_resource *res,
478                                 struct qmt_gl_lock_array *array,
479                                 qmt_glimpse_cb_t cb, void *arg)
480 {
481         struct list_head *pos;
482         unsigned long count = 0;
483         int fail_cnt = 0;
484         ENTRY;
485
486         LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
487 again:
488         lock_res(res);
489         /* scan list of granted locks */
490         list_for_each(pos, &res->lr_granted) {
491                 struct ldlm_lock *lock;
492                 int rc;
493
494                 lock = list_entry(pos, struct ldlm_lock, l_res_link);
495                 LASSERT(lock->l_export);
496
497                 if (cb != NULL) {
498                         rc = cb(lock, arg);
499                         /* slave should not be notified */
500                         if (rc == 0)
501                                 continue;
502                 }
503
504                 count++;
505                 if (array->q_max != 0 && array->q_cnt < array->q_max) {
506                         array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
507                         array->q_cnt++;
508                 }
509         }
510         unlock_res(res);
511
512         if (count > array->q_max) {
513                 qmt_free_lock_array(array);
514                 if (++fail_cnt > 5)
515                         RETURN(-EAGAIN);
516                 /*
517                  * allocate more slots in case of more qualified locks are
518                  * found during next loop
519                  */
520                 array->q_max = count + count / 2 + 10;
521                 count = 0;
522                 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
523                 OBD_ALLOC(array->q_locks,
524                           sizeof(*array->q_locks) * array->q_max);
525                 if (array->q_locks == NULL) {
526                         array->q_max = 0;
527                         RETURN(-ENOMEM);
528                 }
529
530                 goto again;
531         }
532         RETURN(0);
533 }
534
535 /*
536  * Send glimpse callback to slaves holding a lock on resource \res.
537  * This is used to notify slaves of new quota settings or to claim quota space
538  * back.
539  *
540  * \param env  - is the environment passed by the caller
541  * \param qmt  - is the quota master target
542  * \param res  - is the dlm resource associated with the quota object
543  * \param desc - is the glimpse descriptor to pack in glimpse callback
544  * \param cb   - is the callback function called on every lock and determine
545  *               whether a glimpse should be issued
546  * \param arg  - is an opaq parameter passed to the callback function
547  */
548 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
549                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
550                             qmt_glimpse_cb_t cb, void *arg)
551 {
552         struct list_head *tmp, *pos;
553         struct list_head gl_list = LIST_HEAD_INIT(gl_list);
554         struct qmt_gl_lock_array locks;
555         unsigned long i;
556         int rc = 0;
557         ENTRY;
558
559         memset(&locks, 0, sizeof(locks));
560         rc = qmt_alloc_lock_array(res, &locks, cb, arg);
561         if (rc) {
562                 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
563                        qmt->qmt_svname, rc);
564                 RETURN(rc);
565         }
566
567         for (i = locks.q_cnt; i > 0; i--) {
568                 struct ldlm_glimpse_work *work;
569
570                 OBD_ALLOC_PTR(work);
571                 if (work == NULL) {
572                         CERROR("%s: failed to notify a lock.\n",
573                                qmt->qmt_svname);
574                         continue;
575                 }
576
577                 list_add_tail(&work->gl_list, &gl_list);
578                 work->gl_lock  = locks.q_locks[i - 1];
579                 work->gl_flags = 0;
580                 work->gl_desc  = desc;
581
582                 locks.q_locks[i - 1] = NULL;
583                 locks.q_cnt--;
584         }
585
586         qmt_free_lock_array(&locks);
587
588         if (list_empty(&gl_list)) {
589                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
590                 RETURN(0);
591         }
592
593         /* issue glimpse callbacks to all connected slaves */
594         rc = ldlm_glimpse_locks(res, &gl_list);
595
596         list_for_each_safe(pos, tmp, &gl_list) {
597                 struct ldlm_glimpse_work *work;
598
599                 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
600
601                 list_del(&work->gl_list);
602                 CERROR("%s: failed to notify %s of new quota settings\n",
603                        qmt->qmt_svname,
604                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
605                 LDLM_LOCK_RELEASE(work->gl_lock);
606                 OBD_FREE_PTR(work);
607         }
608
609         RETURN(rc);
610 }
611
612 /*
613  * Send glimpse request to all global quota locks to push new quota setting to
614  * slaves.
615  *
616  * \param env - is the environment passed by the caller
617  * \param lqe - is the lquota entry which has new settings
618  * \param ver - is the version associated with the setting change
619  */
620 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
621                          __u64 ver)
622 {
623         struct qmt_thread_info  *qti = qmt_info(env);
624         struct qmt_pool_info    *pool = lqe2qpi(lqe);
625         struct ldlm_resource    *res = NULL;
626         ENTRY;
627
628         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
629                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
630
631         /* send glimpse callback to notify slaves of new quota settings */
632         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
633         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
634         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
635         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
636         qti->qti_gl_desc.lquota_desc.gl_time      = lqe->lqe_gracetime;
637         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
638
639         /* look up ldlm resource associated with global index */
640         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
641         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
642                                 LDLM_PLAIN, 0);
643         if (IS_ERR(res)) {
644                 /* this might happen if no slaves have enqueued global quota
645                  * locks yet */
646                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
647                              "with "DFID, PFID(&qti->qti_fid));
648                 RETURN_EXIT;
649         }
650
651         qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
652                          NULL, NULL);
653         ldlm_resource_putref(res);
654         EXIT;
655 }
656
657 /* Callback function used to select locks that should be glimpsed when
658  * broadcasting the new qunit value */
659 static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
660 {
661         struct obd_uuid *slv_uuid = arg;
662         struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
663
664         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
665                 RETURN(0);
666         RETURN(+1);
667 }
668
669 /*
670  * Send glimpse request on per-ID lock to push new qunit value to slave.
671  *
672  * \param env  - is the environment passed by the caller
673  * \param qmt  - is the quota master target device
674  * \param lqe  - is the lquota entry with the new qunit value
675  * \param uuid - is the uuid of the slave acquiring space, if any
676  */
677 static void qmt_id_lock_glimpse(const struct lu_env *env,
678                                 struct qmt_device *qmt,
679                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
680 {
681         struct qmt_thread_info  *qti = qmt_info(env);
682         struct qmt_pool_info    *pool = lqe2qpi(lqe);
683         struct ldlm_resource    *res = NULL;
684         ENTRY;
685
686         if (!lqe->lqe_enforced)
687                 RETURN_EXIT;
688
689         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
690                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
691         fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
692         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
693                                 0);
694         if (IS_ERR(res)) {
695                 /* this might legitimately happens if slaves haven't had the
696                  * opportunity to enqueue quota lock yet. */
697                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
698                              "lock "DFID, PFID(&qti->qti_fid));
699                 lqe_write_lock(lqe);
700                 if (lqe->lqe_revoke_time == 0 &&
701                     lqe->lqe_qunit == pool->qpi_least_qunit)
702                         lqe->lqe_revoke_time = ktime_get_seconds();
703                 lqe_write_unlock(lqe);
704                 RETURN_EXIT;
705         }
706
707         lqe_write_lock(lqe);
708         /* The purpose of glimpse callback on per-ID lock is twofold:
709          * - notify slaves of new qunit value and hope they will release some
710          *   spare quota space in return
711          * - notify slaves that master ran out of quota space and there is no
712          *   need to send acquire request any more until further notice */
713
714         /* fill glimpse descriptor with lqe settings */
715         if (lqe->lqe_edquot)
716                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
717         else
718                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
719         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
720
721         if (lqe->lqe_revoke_time == 0 &&
722             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
723                 /* reset lqe_may_rel, it will be updated on glimpse callback
724                  * replies if needed */
725                 lqe->lqe_may_rel = 0;
726
727         /* The rebalance thread is the only thread which can issue glimpses */
728         LASSERT(!lqe->lqe_gl);
729         lqe->lqe_gl = true;
730         lqe_write_unlock(lqe);
731
732         /* issue glimpse callback to slaves */
733         qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
734                          uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
735
736         lqe_write_lock(lqe);
737         if (lqe->lqe_revoke_time == 0 &&
738             qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
739             lqe->lqe_qunit == pool->qpi_least_qunit) {
740                 lqe->lqe_revoke_time = ktime_get_seconds();
741                 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
742         }
743         LASSERT(lqe->lqe_gl);
744         lqe->lqe_gl = false;
745         lqe_write_unlock(lqe);
746
747         ldlm_resource_putref(res);
748         EXIT;
749 }
750
751 /*
752  * Schedule a glimpse request on per-ID locks to push new qunit value or
753  * edquot flag to quota slaves.
754  *
755  * \param qmt  - is the quota master target device
756  * \param lqe  - is the lquota entry with the new qunit value
757  */
758 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
759 {
760         bool    added = false;
761         ENTRY;
762
763         lqe_getref(lqe);
764         spin_lock(&qmt->qmt_reba_lock);
765         if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
766                 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
767                 added = true;
768         }
769         spin_unlock(&qmt->qmt_reba_lock);
770
771         if (added)
772                 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
773         else
774                 lqe_putref(lqe);
775         EXIT;
776 }
777
778 /*
779  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
780  * quota locks owned by slaves in order to notify them of:
781  * - a qunit shrink in which case slaves might release quota space back in
782  *   glimpse reply.
783  * - set/clear edquot flag used to cache the "quota exhausted" state of the
784  *   master. When the flag is set, slaves know that there is no need to
785  *   try to acquire quota from the master since this latter has already
786  *   distributed all the space.
787  */
788 static int qmt_reba_thread(void *arg)
789 {
790         struct qmt_device       *qmt = (struct qmt_device *)arg;
791         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
792         struct l_wait_info       lwi = { 0 };
793         struct lu_env           *env;
794         struct lquota_entry     *lqe, *tmp;
795         int                      rc;
796         ENTRY;
797
798         OBD_ALLOC_PTR(env);
799         if (env == NULL)
800                 RETURN(-ENOMEM);
801
802         rc = lu_env_init(env, LCT_MD_THREAD);
803         if (rc) {
804                 CERROR("%s: failed to init env.", qmt->qmt_svname);
805                 OBD_FREE_PTR(env);
806                 RETURN(rc);
807         }
808
809         thread_set_flags(thread, SVC_RUNNING);
810         wake_up(&thread->t_ctl_waitq);
811
812         while (1) {
813                 l_wait_event(thread->t_ctl_waitq,
814                              !list_empty(&qmt->qmt_reba_list) ||
815                              !thread_is_running(thread), &lwi);
816
817                 spin_lock(&qmt->qmt_reba_lock);
818                 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
819                                          lqe_link) {
820                         list_del_init(&lqe->lqe_link);
821                         spin_unlock(&qmt->qmt_reba_lock);
822
823                         if (thread_is_running(thread))
824                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
825
826                         lqe_putref(lqe);
827                         spin_lock(&qmt->qmt_reba_lock);
828                 }
829                 spin_unlock(&qmt->qmt_reba_lock);
830
831                 if (!thread_is_running(thread))
832                         break;
833         }
834         lu_env_fini(env);
835         OBD_FREE_PTR(env);
836         thread_set_flags(thread, SVC_STOPPED);
837         wake_up(&thread->t_ctl_waitq);
838         RETURN(rc);
839 }
840
841 /*
842  * Start rebalance thread. Called when the QMT is being setup
843  */
844 int qmt_start_reba_thread(struct qmt_device *qmt)
845 {
846         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
847         struct l_wait_info       lwi    = { 0 };
848         struct task_struct              *task;
849         ENTRY;
850
851         task = kthread_run(qmt_reba_thread, (void *)qmt,
852                                "qmt_reba_%s", qmt->qmt_svname);
853         if (IS_ERR(task)) {
854                 CERROR("%s: failed to start rebalance thread (%ld)\n",
855                        qmt->qmt_svname, PTR_ERR(task));
856                 thread_set_flags(thread, SVC_STOPPED);
857                 RETURN(PTR_ERR(task));
858         }
859
860         l_wait_event(thread->t_ctl_waitq,
861                      thread_is_running(thread) || thread_is_stopped(thread),
862                      &lwi);
863
864         RETURN(0);
865 }
866
867 /*
868  * Stop rebalance thread. Called when the QMT is about to shutdown.
869  */
870 void qmt_stop_reba_thread(struct qmt_device *qmt)
871 {
872         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
873
874         if (!thread_is_stopped(thread)) {
875                 struct l_wait_info lwi = { 0 };
876
877                 thread_set_flags(thread, SVC_STOPPING);
878                 wake_up(&thread->t_ctl_waitq);
879
880                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
881                              &lwi);
882         }
883         LASSERT(list_empty(&qmt->qmt_reba_list));
884 }