Whamcloud - gitweb
LU-1842 quota: add quota locks support on QMT
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
39
40 #include "qmt_internal.h"
41
42 /* intent policy function called from mdt_intent_opc() when the intent is of
43  * quota type */
44 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
45                       struct ptlrpc_request *req, struct ldlm_lock **lockp,
46                       int flags)
47 {
48         struct qmt_device       *qmt = lu2qmt_dev(ld);
49         struct ldlm_intent      *it;
50         struct quota_body       *reqbody;
51         struct quota_body       *repbody;
52         struct obd_uuid         *uuid;
53         struct lquota_lvb       *lvb;
54         int                      rc;
55         ENTRY;
56
57         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
58
59         /* extract quota body and intent opc */
60         it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
61         if (it == NULL)
62                 RETURN(err_serious(-EFAULT));
63
64         reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
65         if (reqbody == NULL)
66                 RETURN(err_serious(-EFAULT));
67
68         /* prepare reply */
69         rc = req_capsule_server_pack(&req->rq_pill);
70         if (rc != 0) {
71                 CERROR("Can't pack response, rc %d\n", rc);
72                 RETURN(err_serious(rc));
73         }
74
75         repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
76         if (repbody == NULL)
77                 RETURN(err_serious(-EFAULT));
78
79         uuid = &(*lockp)->l_export->exp_client_uuid;
80         switch (it->opc) {
81
82         case IT_QUOTA_DQACQ:
83                 /* XXX: to be added in a next patch */
84                 GOTO(out, -EOPNOTSUPP);
85                 break;
86
87         case IT_QUOTA_CONN:
88                 /* new connection from slave */
89                 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
90                                        &repbody->qb_slv_fid,
91                                        &repbody->qb_slv_ver, uuid);
92                 if (rc)
93                         GOTO(out, rc);
94                 break;
95
96         default:
97                 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
98                        it->opc);
99                 GOTO(out, rc = err_serious(-EINVAL));
100         }
101
102         /* on success, pack lvb in reply */
103         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
104                              ldlm_lvbo_size(*lockp));
105         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
106         ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
107         EXIT;
108 out:
109         return rc;
110 }
111
112 /*
113  * Initialize quota LVB associated with quota indexes.
114  * Called with res->lr_lvb_sem held
115  */
116 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
117 {
118         struct lu_env           *env;
119         struct qmt_thread_info  *qti;
120         struct qmt_device       *qmt = lu2qmt_dev(ld);
121         int                      pool_id, pool_type, qtype;
122         int                      rc;
123         ENTRY;
124
125         LASSERT(res != NULL);
126
127         if (res->lr_type != LDLM_PLAIN)
128                 RETURN(-ENOTSUPP);
129
130         if (res->lr_lvb_data ||
131             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
132                 RETURN(0);
133
134         OBD_ALLOC_PTR(env);
135         if (env == NULL)
136                 RETURN(-ENOMEM);
137
138         /* initialize environment */
139         rc = lu_env_init(env, LCT_MD_THREAD);
140         if (rc) {
141                 OBD_FREE_PTR(env);
142                 RETURN(rc);
143         }
144         qti = qmt_info(env);
145
146         /* extract global index FID and quota identifier */
147         fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
148
149         /* sanity check the global index FID */
150         rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
151         if (rc) {
152                 CERROR("can't extract pool information from FID "DFID"\n",
153                        PFID(&qti->qti_fid));
154                 GOTO(out, rc);
155         }
156
157         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
158                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
159                  * we are thus dealing with an ID lock. */
160                 struct lquota_entry     *lqe;
161
162                 /* Find the quota entry associated with the quota id */
163                 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
164                                           &qti->qti_id);
165                 if (IS_ERR(lqe))
166                         GOTO(out, rc = PTR_ERR(lqe));
167
168                 /* store reference to lqe in lr_lvb_data */
169                 res->lr_lvb_data = lqe;
170                 LQUOTA_DEBUG(lqe, "initialized res lvb");
171         } else {
172                 struct dt_object        *obj;
173
174                 /* lookup global index */
175                 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
176                 if (IS_ERR(obj))
177                         GOTO(out, rc = PTR_ERR(obj));
178                 if (!dt_object_exists(obj)) {
179                         lu_object_put(env, &obj->do_lu);
180                         GOTO(out, rc = -ENOENT);
181                 }
182
183                 /* store reference to global index object in lr_lvb_data */
184                 res->lr_lvb_data = obj;
185                 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
186         }
187
188         res->lr_lvb_len  = sizeof(struct lquota_lvb);
189         EXIT;
190 out:
191         lu_env_fini(env);
192         OBD_FREE_PTR(env);
193         return rc;
194 }
195
196 /*
197  * Update LVB associated with the global quota index.
198  * This function is called from the DLM itself after a glimpse callback, in this
199  * case valid ptlrpc request is passed.
200  */
201 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
202                     struct ptlrpc_request *req, int increase_only)
203 {
204         struct lu_env           *env;
205         struct qmt_thread_info  *qti;
206         struct qmt_device       *qmt = lu2qmt_dev(ld);
207         struct lquota_entry     *lqe;
208         struct lquota_lvb       *lvb;
209         int                      rc = 0;
210         ENTRY;
211
212         LASSERT(res != NULL);
213
214         if (req == NULL)
215                 RETURN(0);
216
217         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
218                 /* no need to update lvb for global quota locks */
219                 RETURN(0);
220
221         lqe = res->lr_lvb_data;
222         LASSERT(lqe != NULL);
223
224         /* allocate environement */
225         OBD_ALLOC_PTR(env);
226         if (env == NULL)
227                 RETURN(-ENOMEM);
228
229         /* initialize environment */
230         rc = lu_env_init(env, LCT_MD_THREAD);
231         if (rc) {
232                 OBD_FREE_PTR(env);
233                 RETURN(rc);
234         }
235         qti = qmt_info(env);
236
237         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
238         if (lvb == NULL) {
239                 CERROR("%s: failed to extract lvb from request\n",
240                        qmt->qmt_svname);
241                 GOTO(out, rc);
242         }
243
244         /* XXX: Space release handling to be added in a next patch */
245
246         EXIT;
247 out:
248         lu_env_fini(env);
249         OBD_FREE_PTR(env);
250         return rc;
251 }
252
253 /*
254  * Report size of lvb to ldlm layer in order to allocate lvb buffer
255  * As far as quota locks are concerned, the size is static and is the same
256  * for both global and per-ID locks which shares the same lvb format.
257  */
258 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
259 {
260         return sizeof(struct lquota_lvb);
261 }
262
263 /*
264  * Fill request buffer with quota lvb
265  */
266 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
267                   int lvblen)
268 {
269         struct ldlm_resource    *res = lock->l_resource;
270         struct lquota_lvb       *qlvb = lvb;
271         ENTRY;
272
273         LASSERT(res != NULL);
274
275         if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
276             res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
277                 RETURN(-EINVAL);
278
279         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
280                 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
281                  * we are thus dealing with an ID lock. */
282                 struct lquota_entry     *lqe = res->lr_lvb_data;
283
284                 /* return current qunit value & edquot flags in lvb */
285                 lqe_getref(lqe);
286                 qlvb->lvb_id_qunit = lqe->lqe_qunit;
287                 qlvb->lvb_flags = 0;
288                 if (lqe->lqe_edquot)
289                         qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
290                 lqe_putref(lqe);
291         } else {
292                 /* global quota lock */
293                 struct lu_env           *env;
294                 int                      rc;
295                 struct dt_object        *obj = res->lr_lvb_data;
296
297                 OBD_ALLOC_PTR(env);
298                 if (env == NULL)
299                         RETURN(-ENOMEM);
300
301                 /* initialize environment */
302                 rc = lu_env_init(env, LCT_LOCAL);
303                 if (rc) {
304                         OBD_FREE_PTR(env);
305                         RETURN(rc);
306                 }
307
308                 /* return current version of global index */
309                 qlvb->lvb_glb_ver = dt_version_get(env, obj);
310
311                 lu_env_fini(env);
312                 OBD_FREE_PTR(env);
313         }
314
315         RETURN(sizeof(struct lquota_lvb));
316 }
317
318 /*
319  * Free lvb associated with a given ldlm resource
320  * we don't really allocate a lvb, lr_lvb_data just points to
321  * the appropriate backend structures.
322  */
323 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
324 {
325         ENTRY;
326
327         if (res->lr_lvb_data == NULL)
328                 RETURN(0);
329
330         if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
331                 struct lquota_entry     *lqe = res->lr_lvb_data;
332
333                 /* release lqe reference */
334                 lqe_putref(lqe);
335         } else {
336                 struct dt_object        *obj = res->lr_lvb_data;
337                 struct lu_env           *env;
338                 int                      rc;
339
340                 OBD_ALLOC_PTR(env);
341                 if (env == NULL)
342                         RETURN(-ENOMEM);
343
344                 /* initialize environment */
345                 rc = lu_env_init(env, LCT_LOCAL);
346                 if (rc) {
347                         OBD_FREE_PTR(env);
348                         RETURN(rc);
349                 }
350
351                 /* release object reference */
352                 lu_object_put(env, &obj->do_lu);
353                 lu_env_fini(env);
354                 OBD_FREE_PTR(env);
355         }
356
357         res->lr_lvb_data = NULL;
358         res->lr_lvb_len  = 0;
359
360         RETURN(0);
361 }
362
363 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
364                                 struct obd_uuid *, union ldlm_gl_desc *,
365                                 void *);
366 /*
367  * Send glimpse callback to slaves holding a lock on resource \res.
368  * This is used to notify slaves of new quota settings or to claim quota space
369  * back.
370  *
371  * \param env  - is the environment passed by the caller
372  * \param qmt  - is the quota master target
373  * \param res  - is the dlm resource associated with the quota object
374  * \param desc - is the glimpse descriptor to pack in glimpse callback
375  * \param cb   - is the callback function called on every lock and determine
376  *               whether a glimpse should be issued
377  * \param arg  - is an opaq parameter passed to the callback function
378  */
379 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
380                             struct ldlm_resource *res, union ldlm_gl_desc *desc,
381                             qmt_glimpse_cb_t cb, void *arg)
382 {
383         cfs_list_t      *tmp, *pos;
384         CFS_LIST_HEAD(gl_list);
385         int              rc = 0;
386         ENTRY;
387
388         lock_res(res);
389         /* scan list of granted locks */
390         cfs_list_for_each(pos, &res->lr_granted) {
391                 struct ldlm_glimpse_work        *work;
392                 struct ldlm_lock                *lock;
393                 struct obd_uuid                 *uuid;
394
395                 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
396                 LASSERT(lock->l_export);
397                 uuid = &lock->l_export->exp_client_uuid;
398
399                 if (cb != NULL) {
400                         rc = cb(env, qmt, uuid, desc, arg);
401                         if (rc == 0)
402                                 /* slave should not be notified */
403                                 continue;
404                         if (rc < 0)
405                                 /* something wrong happened, we still notify */
406                                 CERROR("%s: callback function failed to "
407                                        "determine whether slave %s should be "
408                                        "notified (%d)\n", qmt->qmt_svname,
409                                        obd_uuid2str(uuid), rc);
410                 }
411
412                 OBD_ALLOC_PTR(work);
413                 if (work == NULL) {
414                         CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
415                                obd_uuid2str(uuid));
416                         continue;
417                 }
418
419                 cfs_list_add_tail(&work->gl_list, &gl_list);
420                 work->gl_lock  = LDLM_LOCK_GET(lock);
421                 work->gl_flags = 0;
422                 work->gl_desc  = desc;
423
424         }
425         unlock_res(res);
426
427         if (cfs_list_empty(&gl_list)) {
428                 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
429                 RETURN(0);
430         }
431
432         /* issue glimpse callbacks to all connected slaves */
433         rc = ldlm_glimpse_locks(res, &gl_list);
434
435         cfs_list_for_each_safe(pos, tmp, &gl_list) {
436                 struct ldlm_glimpse_work *work;
437
438                 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
439
440                 cfs_list_del(&work->gl_list);
441                 CERROR("%s: failed to notify %s of new quota settings\n",
442                        qmt->qmt_svname,
443                        obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
444                 LDLM_LOCK_RELEASE(work->gl_lock);
445                 OBD_FREE_PTR(work);
446         }
447
448         RETURN(rc);
449 }
450
451 /*
452  * Send glimpse request to all global quota locks to push new quota setting to
453  * slaves.
454  *
455  * \param env - is the environment passed by the caller
456  * \param lqe - is the lquota entry which has new settings
457  * \param ver - is the version associated with the setting change
458  */
459 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
460                          __u64 ver)
461 {
462         struct qmt_thread_info  *qti = qmt_info(env);
463         struct qmt_pool_info    *pool = lqe2qpi(lqe);
464         struct ldlm_resource    *res = NULL;
465         int                      rc;
466         ENTRY;
467
468         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
469                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
470
471         /* send glimpse callback to notify slaves of new quota settings */
472         qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
473         qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
474         qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
475         qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
476         qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
477
478         /* look up ldlm resource associated with global index */
479         fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
480         res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
481                                 LDLM_PLAIN, 0);
482         if (res == NULL) {
483                 /* this might happen if no slaves have enqueued global quota
484                  * locks yet */
485                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
486                              "with "DFID, PFID(&qti->qti_fid));
487                 RETURN_EXIT;
488         }
489
490         rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
491                               NULL, NULL);
492         ldlm_resource_putref(res);
493         EXIT;
494 }
495
496 /* Callback function used to select locks that should be glimpsed when
497  * broadcasting the new qunit value */
498 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
499                           struct obd_uuid *uuid, union ldlm_gl_desc *desc,
500                           void *arg)
501 {
502         struct obd_uuid *slv_uuid = arg;
503
504         if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
505                 RETURN(0);
506         RETURN(+1);
507 }
508
509 /*
510  * Send glimpse request on per-ID lock to push new qunit value to slave.
511  *
512  * \param env  - is the environment passed by the caller
513  * \param qmt  - is the quota master target device
514  * \param lqe  - is the lquota entry with the new qunit value
515  * \param uuid - is the uuid of the slave acquiring space, if any
516  */
517 static void qmt_id_lock_glimpse(const struct lu_env *env,
518                                 struct qmt_device *qmt,
519                                 struct lquota_entry *lqe, struct obd_uuid *uuid)
520 {
521         struct qmt_thread_info  *qti = qmt_info(env);
522         struct qmt_pool_info    *pool = lqe2qpi(lqe);
523         struct ldlm_resource    *res = NULL;
524         int                      rc;
525         ENTRY;
526
527         if (!lqe->lqe_enforced)
528                 RETURN_EXIT;
529
530         lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
531                             pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
532         fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
533         res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
534                                 0);
535         if (res == NULL) {
536                 /* this might legitimately happens if slaves haven't had the
537                  * opportunity to enqueue quota lock yet. */
538                 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
539                              "lock "DFID, PFID(&qti->qti_fid));
540                 RETURN_EXIT;
541         }
542
543         lqe_read_lock(lqe);
544         /* The purpose of glimpse callback on per-ID lock is twofold:
545          * - notify slaves of new qunit value and hope they will release some
546          *   spare quota space in return
547          * - notify slaves that master ran out of quota space and there is no
548          *   need to send acquire request any more until further notice */
549
550         /* fill glimpse descriptor with lqe settings */
551         if (lqe->lqe_edquot)
552                 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
553         else
554                 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
555         qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
556         lqe_read_unlock(lqe);
557
558         /* The rebalance thread is the only thread which can issue glimpses */
559         LASSERT(!lqe->lqe_gl);
560         lqe->lqe_gl = true;
561
562         /* issue glimpse callback to slaves */
563         rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
564                               uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
565
566         LASSERT(lqe->lqe_gl);
567         lqe->lqe_gl = false;
568
569         ldlm_resource_putref(res);
570         EXIT;
571 }
572
573 /*
574  * Schedule a glimpse request on per-ID locks to push new qunit value or
575  * edquot flag to quota slaves.
576  *
577  * \param qmt  - is the quota master target device
578  * \param lqe  - is the lquota entry with the new qunit value
579  */
580 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
581 {
582         bool    added = false;
583         ENTRY;
584
585         lqe_getref(lqe);
586         cfs_spin_lock(&qmt->qmt_reba_lock);
587         if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
588                 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
589                 added = true;
590         }
591         cfs_spin_unlock(&qmt->qmt_reba_lock);
592
593         if (added)
594                 cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
595         else
596                 lqe_putref(lqe);
597         EXIT;
598 }
599
600 /*
601  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
602  * quota locks owned by slaves in order to notify them of:
603  * - a qunit shrink in which case slaves might release quota space back in
604  *   glimpse reply.
605  * - set/clear edquot flag used to cache the "quota exhausted" state of the
606  *   master. When the flag is set, slaves know that there is no need to
607  *   try to acquire quota from the master since this latter has already
608  *   distributed all the space.
609  */
610 static int qmt_reba_thread(void *arg)
611 {
612         struct qmt_device       *qmt = (struct qmt_device *)arg;
613         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
614         struct l_wait_info       lwi = { 0 };
615         struct lu_env           *env;
616         struct lquota_entry     *lqe, *tmp;
617         char                     pname[MTI_NAME_MAXLEN];
618         int                      rc;
619         ENTRY;
620
621         OBD_ALLOC_PTR(env);
622         if (env == NULL)
623                 RETURN(-ENOMEM);
624
625         rc = lu_env_init(env, LCT_MD_THREAD);
626         if (rc) {
627                 CERROR("%s: failed to init env.", qmt->qmt_svname);
628                 OBD_FREE_PTR(env);
629                 RETURN(rc);
630         }
631
632         snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
633         cfs_daemonize(pname);
634
635         thread_set_flags(thread, SVC_RUNNING);
636         cfs_waitq_signal(&thread->t_ctl_waitq);
637
638         while (1) {
639                 l_wait_event(thread->t_ctl_waitq,
640                              !cfs_list_empty(&qmt->qmt_reba_list) ||
641                              !thread_is_running(thread), &lwi);
642
643                 cfs_spin_lock(&qmt->qmt_reba_lock);
644                 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
645                                              lqe_link) {
646                         cfs_list_del_init(&lqe->lqe_link);
647                         cfs_spin_unlock(&qmt->qmt_reba_lock);
648
649                         if (thread_is_running(thread))
650                                 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
651
652                         lqe_putref(lqe);
653                         cfs_spin_lock(&qmt->qmt_reba_lock);
654                 }
655                 cfs_spin_unlock(&qmt->qmt_reba_lock);
656
657                 if (!thread_is_running(thread))
658                         break;
659         }
660         lu_env_fini(env);
661         OBD_FREE_PTR(env);
662         thread_set_flags(thread, SVC_STOPPED);
663         cfs_waitq_signal(&thread->t_ctl_waitq);
664         RETURN(rc);
665 }
666
667 /*
668  * Start rebalance thread. Called when the QMT is being setup
669  */
670 int qmt_start_reba_thread(struct qmt_device *qmt)
671 {
672         struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
673         struct l_wait_info       lwi    = { 0 };
674         int                      rc;
675         ENTRY;
676
677         rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
678         if (rc < 0) {
679                 CERROR("%s: failed to start rebalance thread (%d)\n",
680                        qmt->qmt_svname, rc);
681                 thread_set_flags(thread, SVC_STOPPED);
682                 RETURN(rc);
683         }
684
685         l_wait_event(thread->t_ctl_waitq,
686                      thread_is_running(thread) || thread_is_stopped(thread),
687                      &lwi);
688
689         RETURN(0);
690 }
691
692 /*
693  * Stop rebalance thread. Called when the QMT is about to shutdown.
694  */
695 void qmt_stop_reba_thread(struct qmt_device *qmt)
696 {
697         struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
698
699         if (!thread_is_stopped(thread)) {
700                 struct l_wait_info lwi = { 0 };
701
702                 thread_set_flags(thread, SVC_STOPPING);
703                 cfs_waitq_signal(&thread->t_ctl_waitq);
704
705                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
706                              &lwi);
707         }
708         LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
709 }