4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include <linux/kthread.h>
35 #include <lustre_dlm.h>
36 #include <lustre_swab.h>
37 #include <obd_class.h>
39 #include "qmt_internal.h"
41 /* intent policy function called from mdt_intent_opc() when the intent is of
43 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
44 struct ptlrpc_request *req, struct ldlm_lock **lockp,
47 struct qmt_device *qmt = lu2qmt_dev(ld);
48 struct ldlm_intent *it;
49 struct quota_body *reqbody;
50 struct quota_body *repbody;
51 struct obd_uuid *uuid;
52 struct lquota_lvb *lvb;
53 struct ldlm_resource *res = (*lockp)->l_resource;
54 struct ldlm_reply *ldlm_rep;
58 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60 ldlm_lvbo_size(*lockp));
62 /* extract quota body and intent opc */
63 it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
65 RETURN(err_serious(-EFAULT));
67 reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
69 RETURN(err_serious(-EFAULT));
72 rc = req_capsule_server_pack(&req->rq_pill);
74 CERROR("Can't pack response, rc %d\n", rc);
75 RETURN(err_serious(rc));
78 repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
80 RETURN(err_serious(-EFAULT));
82 ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
84 RETURN(err_serious(-EFAULT));
86 uuid = &(*lockp)->l_export->exp_client_uuid;
89 case IT_QUOTA_DQACQ: {
90 struct lquota_entry *lqe;
91 struct ldlm_lock *lock;
93 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
94 /* acquire on global lock? something is wrong ... */
95 GOTO(out, rc = -EPROTO);
97 /* verify global lock isn't stale */
98 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
99 GOTO(out, rc = -ENOLCK);
101 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
103 GOTO(out, rc = -ENOLCK);
106 lqe = res->lr_lvb_data;
107 LASSERT(lqe != NULL);
110 /* acquire quota space */
111 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
112 reqbody->qb_count, reqbody->qb_usage,
121 /* new connection from slave */
123 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
124 /* connection on per-ID lock? something is wrong ... */
125 GOTO(out, rc = -EPROTO);
127 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
128 &repbody->qb_slv_fid,
129 &repbody->qb_slv_ver, uuid);
135 CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
137 GOTO(out, rc = -EINVAL);
140 /* on success, pack lvb in reply */
141 lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142 lvb_len = ldlm_lvbo_size(*lockp);
143 lvb_len = ldlm_lvbo_fill(env, *lockp, lvb, &lvb_len);
145 GOTO(out, rc = lvb_len);
147 req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
149 ldlm_rep->lock_policy_res2 = clear_serious(rc);
155 * Initialize quota LVB associated with quota indexes.
156 * Called with res->lr_lvb_sem held
158 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
161 struct qmt_thread_info *qti;
162 struct qmt_device *qmt = lu2qmt_dev(ld);
163 int pool_id, pool_type, qtype;
167 LASSERT(res != NULL);
169 if (res->lr_type != LDLM_PLAIN)
172 if (res->lr_lvb_data ||
173 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
180 /* initialize environment */
181 rc = lu_env_init(env, LCT_MD_THREAD);
186 /* extract global index FID and quota identifier */
187 fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
189 /* sanity check the global index FID */
190 rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
192 CERROR("can't extract pool information from FID "DFID"\n",
193 PFID(&qti->qti_fid));
197 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
198 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
199 * we are thus dealing with an ID lock. */
200 struct lquota_entry *lqe;
202 /* Find the quota entry associated with the quota id */
203 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
206 GOTO(out, rc = PTR_ERR(lqe));
208 /* store reference to lqe in lr_lvb_data */
209 res->lr_lvb_data = lqe;
210 LQUOTA_DEBUG(lqe, "initialized res lvb");
212 struct dt_object *obj;
214 /* lookup global index */
215 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
217 GOTO(out, rc = PTR_ERR(obj));
218 if (!dt_object_exists(obj)) {
219 dt_object_put(env, obj);
220 GOTO(out, rc = -ENOENT);
223 /* store reference to global index object in lr_lvb_data */
224 res->lr_lvb_data = obj;
225 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
228 res->lr_lvb_len = sizeof(struct lquota_lvb);
238 * Update LVB associated with the global quota index.
239 * This function is called from the DLM itself after a glimpse callback, in this
240 * case valid ptlrpc request is passed.
242 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
243 struct ptlrpc_request *req, int increase_only)
246 struct qmt_thread_info *qti;
247 struct qmt_device *qmt = lu2qmt_dev(ld);
248 struct lquota_entry *lqe;
249 struct lquota_lvb *lvb;
250 struct ldlm_lock *lock;
251 struct obd_export *exp;
255 LASSERT(res != NULL);
260 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
261 /* no need to update lvb for global quota locks */
264 lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
265 lustre_swab_lquota_lvb);
267 CERROR("%s: failed to extract lvb from request\n",
272 lqe = res->lr_lvb_data;
273 LASSERT(lqe != NULL);
276 LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
277 lvb->lvb_id_rel, lvb->lvb_id_may_rel);
279 if (lvb->lvb_id_rel == 0) {
280 /* nothing to release */
281 if (lvb->lvb_id_may_rel != 0)
282 /* but might still release later ... */
283 lqe->lqe_may_rel += lvb->lvb_id_may_rel;
284 GOTO(out_lqe, rc = 0);
287 /* allocate environement */
290 GOTO(out_lqe, rc = -ENOMEM);
292 /* initialize environment */
293 rc = lu_env_init(env, LCT_MD_THREAD);
298 /* The request is a glimpse callback which was sent via the
299 * reverse import to the slave. What we care about here is the
300 * export associated with the slave and req->rq_export is
301 * definitely not what we are looking for (it is actually set to
303 * Therefore we extract the lock from the request argument
304 * and use lock->l_export. */
305 lock = ldlm_request_lock(req);
307 CERROR("%s: failed to get lock from request!\n",
309 GOTO(out_env_init, rc = PTR_ERR(lock));
312 exp = class_export_get(lock->l_export);
314 CERROR("%s: failed to get export from lock!\n",
316 GOTO(out_env_init, rc = -EFAULT);
319 /* release quota space */
320 rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
321 QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
322 if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
323 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
324 "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
325 lvb->lvb_id_rel, rc);
326 class_export_put(exp);
328 GOTO(out_env_init, rc);
340 * Report size of lvb to ldlm layer in order to allocate lvb buffer
341 * As far as quota locks are concerned, the size is static and is the same
342 * for both global and per-ID locks which shares the same lvb format.
344 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
346 return sizeof(struct lquota_lvb);
350 * Fill request buffer with quota lvb
352 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
355 struct ldlm_resource *res = lock->l_resource;
356 struct lquota_lvb *qlvb = lvb;
359 LASSERT(res != NULL);
361 if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
362 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
365 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
366 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
367 * we are thus dealing with an ID lock. */
368 struct lquota_entry *lqe = res->lr_lvb_data;
370 /* return current qunit value & edquot flags in lvb */
372 qlvb->lvb_id_qunit = lqe->lqe_qunit;
375 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
378 /* global quota lock */
381 struct dt_object *obj = res->lr_lvb_data;
387 /* initialize environment */
388 rc = lu_env_init(env, LCT_LOCAL);
394 /* return current version of global index */
395 qlvb->lvb_glb_ver = dt_version_get(env, obj);
401 RETURN(sizeof(struct lquota_lvb));
405 * Free lvb associated with a given ldlm resource
406 * we don't really allocate a lvb, lr_lvb_data just points to
407 * the appropriate backend structures.
409 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
413 if (res->lr_lvb_data == NULL)
416 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
417 struct lquota_entry *lqe = res->lr_lvb_data;
419 /* release lqe reference */
422 struct dt_object *obj = res->lr_lvb_data;
430 /* initialize environment */
431 rc = lu_env_init(env, LCT_LOCAL);
437 /* release object reference */
438 dt_object_put(env, obj);
443 res->lr_lvb_data = NULL;
449 typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
451 struct qmt_gl_lock_array {
454 struct ldlm_lock **q_locks;
457 static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
461 if (array->q_max == 0) {
462 LASSERT(array->q_locks == NULL);
466 for (i = 0; i < array->q_cnt; i++) {
467 LASSERT(array->q_locks[i]);
468 LDLM_LOCK_RELEASE(array->q_locks[i]);
469 array->q_locks[i] = NULL;
472 OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks));
473 array->q_locks = NULL;
477 static int qmt_alloc_lock_array(struct ldlm_resource *res,
478 struct qmt_gl_lock_array *array,
479 qmt_glimpse_cb_t cb, void *arg)
481 struct list_head *pos;
482 unsigned long count = 0;
486 LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
489 /* scan list of granted locks */
490 list_for_each(pos, &res->lr_granted) {
491 struct ldlm_lock *lock;
494 lock = list_entry(pos, struct ldlm_lock, l_res_link);
495 LASSERT(lock->l_export);
499 /* slave should not be notified */
505 if (array->q_max != 0 && array->q_cnt < array->q_max) {
506 array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
512 if (count > array->q_max) {
513 qmt_free_lock_array(array);
517 * allocate more slots in case of more qualified locks are
518 * found during next loop
520 array->q_max = count + count / 2 + 10;
522 LASSERT(array->q_locks == NULL && array->q_cnt == 0);
523 OBD_ALLOC(array->q_locks,
524 sizeof(*array->q_locks) * array->q_max);
525 if (array->q_locks == NULL) {
536 * Send glimpse callback to slaves holding a lock on resource \res.
537 * This is used to notify slaves of new quota settings or to claim quota space
540 * \param env - is the environment passed by the caller
541 * \param qmt - is the quota master target
542 * \param res - is the dlm resource associated with the quota object
543 * \param desc - is the glimpse descriptor to pack in glimpse callback
544 * \param cb - is the callback function called on every lock and determine
545 * whether a glimpse should be issued
546 * \param arg - is an opaq parameter passed to the callback function
548 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
549 struct ldlm_resource *res, union ldlm_gl_desc *desc,
550 qmt_glimpse_cb_t cb, void *arg)
552 struct list_head *tmp, *pos;
553 struct list_head gl_list = LIST_HEAD_INIT(gl_list);
554 struct qmt_gl_lock_array locks;
559 memset(&locks, 0, sizeof(locks));
560 rc = qmt_alloc_lock_array(res, &locks, cb, arg);
562 CERROR("%s: failed to allocate glimpse lock array (%d)\n",
563 qmt->qmt_svname, rc);
567 for (i = locks.q_cnt; i > 0; i--) {
568 struct ldlm_glimpse_work *work;
572 CERROR("%s: failed to notify a lock.\n",
577 list_add_tail(&work->gl_list, &gl_list);
578 work->gl_lock = locks.q_locks[i - 1];
580 work->gl_desc = desc;
582 locks.q_locks[i - 1] = NULL;
586 qmt_free_lock_array(&locks);
588 if (list_empty(&gl_list)) {
589 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
593 /* issue glimpse callbacks to all connected slaves */
594 rc = ldlm_glimpse_locks(res, &gl_list);
596 list_for_each_safe(pos, tmp, &gl_list) {
597 struct ldlm_glimpse_work *work;
599 work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
601 list_del(&work->gl_list);
602 CERROR("%s: failed to notify %s of new quota settings\n",
604 obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
605 LDLM_LOCK_RELEASE(work->gl_lock);
613 * Send glimpse request to all global quota locks to push new quota setting to
616 * \param env - is the environment passed by the caller
617 * \param lqe - is the lquota entry which has new settings
618 * \param ver - is the version associated with the setting change
620 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
623 struct qmt_thread_info *qti = qmt_info(env);
624 struct qmt_pool_info *pool = lqe2qpi(lqe);
625 struct ldlm_resource *res = NULL;
628 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
629 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
631 /* send glimpse callback to notify slaves of new quota settings */
632 qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id;
633 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
634 if (lqe->lqe_is_default) {
635 qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
636 qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
637 qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
638 LQUOTA_FLAG_DEFAULT);
641 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
642 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
643 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
645 qti->qti_gl_desc.lquota_desc.gl_ver = ver;
647 /* look up ldlm resource associated with global index */
648 fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
649 res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
652 /* this might happen if no slaves have enqueued global quota
654 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
655 "with "DFID, PFID(&qti->qti_fid));
659 qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
661 ldlm_resource_putref(res);
665 /* Callback function used to select locks that should be glimpsed when
666 * broadcasting the new qunit value */
667 static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
669 struct obd_uuid *slv_uuid = arg;
670 struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
672 if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
678 * Send glimpse request on per-ID lock to push new qunit value to slave.
680 * \param env - is the environment passed by the caller
681 * \param qmt - is the quota master target device
682 * \param lqe - is the lquota entry with the new qunit value
683 * \param uuid - is the uuid of the slave acquiring space, if any
685 static void qmt_id_lock_glimpse(const struct lu_env *env,
686 struct qmt_device *qmt,
687 struct lquota_entry *lqe, struct obd_uuid *uuid)
689 struct qmt_thread_info *qti = qmt_info(env);
690 struct qmt_pool_info *pool = lqe2qpi(lqe);
691 struct ldlm_resource *res = NULL;
694 if (!lqe->lqe_enforced)
697 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
698 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
699 fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
700 res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
703 /* this might legitimately happens if slaves haven't had the
704 * opportunity to enqueue quota lock yet. */
705 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
706 "lock "DFID, PFID(&qti->qti_fid));
708 if (lqe->lqe_revoke_time == 0 &&
709 lqe->lqe_qunit == pool->qpi_least_qunit)
710 lqe->lqe_revoke_time = ktime_get_seconds();
711 lqe_write_unlock(lqe);
716 /* The purpose of glimpse callback on per-ID lock is twofold:
717 * - notify slaves of new qunit value and hope they will release some
718 * spare quota space in return
719 * - notify slaves that master ran out of quota space and there is no
720 * need to send acquire request any more until further notice */
722 /* fill glimpse descriptor with lqe settings */
724 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
726 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
727 qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
729 if (lqe->lqe_revoke_time == 0 &&
730 qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
731 /* reset lqe_may_rel, it will be updated on glimpse callback
732 * replies if needed */
733 lqe->lqe_may_rel = 0;
735 /* The rebalance thread is the only thread which can issue glimpses */
736 LASSERT(!lqe->lqe_gl);
738 lqe_write_unlock(lqe);
740 /* issue glimpse callback to slaves */
741 qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
742 uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
745 if (lqe->lqe_revoke_time == 0 &&
746 qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
747 lqe->lqe_qunit == pool->qpi_least_qunit) {
748 lqe->lqe_revoke_time = ktime_get_seconds();
749 qmt_adjust_edquot(lqe, ktime_get_real_seconds());
751 LASSERT(lqe->lqe_gl);
753 lqe_write_unlock(lqe);
755 ldlm_resource_putref(res);
760 * Schedule a glimpse request on per-ID locks to push new qunit value or
761 * edquot flag to quota slaves.
763 * \param qmt - is the quota master target device
764 * \param lqe - is the lquota entry with the new qunit value
766 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
772 spin_lock(&qmt->qmt_reba_lock);
773 if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
774 list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
777 spin_unlock(&qmt->qmt_reba_lock);
780 wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
787 * The rebalance thread is in charge of sending glimpse callbacks on per-ID
788 * quota locks owned by slaves in order to notify them of:
789 * - a qunit shrink in which case slaves might release quota space back in
791 * - set/clear edquot flag used to cache the "quota exhausted" state of the
792 * master. When the flag is set, slaves know that there is no need to
793 * try to acquire quota from the master since this latter has already
794 * distributed all the space.
796 static int qmt_reba_thread(void *arg)
798 struct qmt_device *qmt = (struct qmt_device *)arg;
799 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
800 struct l_wait_info lwi = { 0 };
802 struct lquota_entry *lqe, *tmp;
810 rc = lu_env_init(env, LCT_MD_THREAD);
812 CERROR("%s: failed to init env.", qmt->qmt_svname);
817 thread_set_flags(thread, SVC_RUNNING);
818 wake_up(&thread->t_ctl_waitq);
821 l_wait_event(thread->t_ctl_waitq,
822 !list_empty(&qmt->qmt_reba_list) ||
823 !thread_is_running(thread), &lwi);
825 spin_lock(&qmt->qmt_reba_lock);
826 list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
828 list_del_init(&lqe->lqe_link);
829 spin_unlock(&qmt->qmt_reba_lock);
831 if (thread_is_running(thread))
832 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
835 spin_lock(&qmt->qmt_reba_lock);
837 spin_unlock(&qmt->qmt_reba_lock);
839 if (!thread_is_running(thread))
844 thread_set_flags(thread, SVC_STOPPED);
845 wake_up(&thread->t_ctl_waitq);
850 * Start rebalance thread. Called when the QMT is being setup
852 int qmt_start_reba_thread(struct qmt_device *qmt)
854 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
855 struct l_wait_info lwi = { 0 };
856 struct task_struct *task;
859 task = kthread_run(qmt_reba_thread, (void *)qmt,
860 "qmt_reba_%s", qmt->qmt_svname);
862 CERROR("%s: failed to start rebalance thread (%ld)\n",
863 qmt->qmt_svname, PTR_ERR(task));
864 thread_set_flags(thread, SVC_STOPPED);
865 RETURN(PTR_ERR(task));
868 l_wait_event(thread->t_ctl_waitq,
869 thread_is_running(thread) || thread_is_stopped(thread),
876 * Stop rebalance thread. Called when the QMT is about to shutdown.
878 void qmt_stop_reba_thread(struct qmt_device *qmt)
880 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
882 if (!thread_is_stopped(thread)) {
883 struct l_wait_info lwi = { 0 };
885 thread_set_flags(thread, SVC_STOPPING);
886 wake_up(&thread->t_ctl_waitq);
888 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
891 LASSERT(list_empty(&qmt->qmt_reba_list));