4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2013, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
40 #include "qmt_internal.h"
42 /* intent policy function called from mdt_intent_opc() when the intent is of
44 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
45 struct ptlrpc_request *req, struct ldlm_lock **lockp,
48 struct qmt_device *qmt = lu2qmt_dev(ld);
49 struct ldlm_intent *it;
50 struct quota_body *reqbody;
51 struct quota_body *repbody;
52 struct obd_uuid *uuid;
53 struct lquota_lvb *lvb;
54 struct ldlm_resource *res = (*lockp)->l_resource;
58 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
60 ldlm_lvbo_size(*lockp));
62 /* extract quota body and intent opc */
63 it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
65 RETURN(err_serious(-EFAULT));
67 reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
69 RETURN(err_serious(-EFAULT));
72 rc = req_capsule_server_pack(&req->rq_pill);
74 CERROR("Can't pack response, rc %d\n", rc);
75 RETURN(err_serious(rc));
78 repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
80 RETURN(err_serious(-EFAULT));
82 uuid = &(*lockp)->l_export->exp_client_uuid;
85 case IT_QUOTA_DQACQ: {
86 struct lquota_entry *lqe;
87 struct ldlm_lock *lock;
89 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
90 /* acquire on global lock? something is wrong ... */
91 GOTO(out, rc = -EPROTO);
93 /* verify global lock isn't stale */
94 if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
95 GOTO(out, rc = -ENOLCK);
97 lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
99 GOTO(out, rc = -ENOLCK);
102 lqe = res->lr_lvb_data;
103 LASSERT(lqe != NULL);
106 /* acquire quota space */
107 rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
108 reqbody->qb_count, reqbody->qb_usage,
117 /* new connection from slave */
119 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
120 /* connection on per-ID lock? something is wrong ... */
121 GOTO(out, rc = -EPROTO);
123 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
124 &repbody->qb_slv_fid,
125 &repbody->qb_slv_ver, uuid);
131 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
133 GOTO(out, rc = err_serious(-EINVAL));
136 /* on success, pack lvb in reply */
137 lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
138 lvb_len = ldlm_lvbo_size(*lockp);
139 lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
141 GOTO(out, rc = lvb_len);
143 req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
150 * Initialize quota LVB associated with quota indexes.
151 * Called with res->lr_lvb_sem held
153 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
156 struct qmt_thread_info *qti;
157 struct qmt_device *qmt = lu2qmt_dev(ld);
158 int pool_id, pool_type, qtype;
162 LASSERT(res != NULL);
164 if (res->lr_type != LDLM_PLAIN)
167 if (res->lr_lvb_data ||
168 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
175 /* initialize environment */
176 rc = lu_env_init(env, LCT_MD_THREAD);
181 /* extract global index FID and quota identifier */
182 fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
184 /* sanity check the global index FID */
185 rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
187 CERROR("can't extract pool information from FID "DFID"\n",
188 PFID(&qti->qti_fid));
192 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
193 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
194 * we are thus dealing with an ID lock. */
195 struct lquota_entry *lqe;
197 /* Find the quota entry associated with the quota id */
198 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
201 GOTO(out, rc = PTR_ERR(lqe));
203 /* store reference to lqe in lr_lvb_data */
204 res->lr_lvb_data = lqe;
205 LQUOTA_DEBUG(lqe, "initialized res lvb");
207 struct dt_object *obj;
209 /* lookup global index */
210 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
212 GOTO(out, rc = PTR_ERR(obj));
213 if (!dt_object_exists(obj)) {
214 lu_object_put(env, &obj->do_lu);
215 GOTO(out, rc = -ENOENT);
218 /* store reference to global index object in lr_lvb_data */
219 res->lr_lvb_data = obj;
220 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
223 res->lr_lvb_len = sizeof(struct lquota_lvb);
233 * Update LVB associated with the global quota index.
234 * This function is called from the DLM itself after a glimpse callback, in this
235 * case valid ptlrpc request is passed.
237 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
238 struct ptlrpc_request *req, int increase_only)
241 struct qmt_thread_info *qti;
242 struct qmt_device *qmt = lu2qmt_dev(ld);
243 struct lquota_entry *lqe;
244 struct lquota_lvb *lvb;
245 struct ldlm_lock *lock;
246 struct obd_export *exp;
250 LASSERT(res != NULL);
255 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
256 /* no need to update lvb for global quota locks */
259 lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
260 lustre_swab_lquota_lvb);
262 CERROR("%s: failed to extract lvb from request\n",
267 lqe = res->lr_lvb_data;
268 LASSERT(lqe != NULL);
271 LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
272 lvb->lvb_id_rel, lvb->lvb_id_may_rel);
274 if (lvb->lvb_id_rel == 0) {
275 /* nothing to release */
276 if (lvb->lvb_id_may_rel != 0)
277 /* but might still release later ... */
278 lqe->lqe_may_rel += lvb->lvb_id_may_rel;
279 GOTO(out_lqe, rc = 0);
282 /* allocate environement */
285 GOTO(out_lqe, rc = -ENOMEM);
287 /* initialize environment */
288 rc = lu_env_init(env, LCT_MD_THREAD);
293 /* The request is a glimpse callback which was sent via the
294 * reverse import to the slave. What we care about here is the
295 * export associated with the slave and req->rq_export is
296 * definitely not what we are looking for (it is actually set to
298 * Therefore we extract the lock from the request argument
299 * and use lock->l_export. */
300 lock = ldlm_request_lock(req);
302 CERROR("%s: failed to get lock from request!\n",
304 GOTO(out_env_init, rc = PTR_ERR(lock));
307 exp = class_export_get(lock->l_export);
309 CERROR("%s: failed to get export from lock!\n",
311 GOTO(out_env_init, rc = -EFAULT);
314 /* release quota space */
315 rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
316 QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
317 if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
318 LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
319 LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
320 lvb->lvb_id_rel, rc);
321 class_export_put(exp);
323 GOTO(out_env_init, rc);
335 * Report size of lvb to ldlm layer in order to allocate lvb buffer
336 * As far as quota locks are concerned, the size is static and is the same
337 * for both global and per-ID locks which shares the same lvb format.
339 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
341 return sizeof(struct lquota_lvb);
345 * Fill request buffer with quota lvb
347 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
350 struct ldlm_resource *res = lock->l_resource;
351 struct lquota_lvb *qlvb = lvb;
354 LASSERT(res != NULL);
356 if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
357 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
360 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
361 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
362 * we are thus dealing with an ID lock. */
363 struct lquota_entry *lqe = res->lr_lvb_data;
365 /* return current qunit value & edquot flags in lvb */
367 qlvb->lvb_id_qunit = lqe->lqe_qunit;
370 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
373 /* global quota lock */
376 struct dt_object *obj = res->lr_lvb_data;
382 /* initialize environment */
383 rc = lu_env_init(env, LCT_LOCAL);
389 /* return current version of global index */
390 qlvb->lvb_glb_ver = dt_version_get(env, obj);
396 RETURN(sizeof(struct lquota_lvb));
400 * Free lvb associated with a given ldlm resource
401 * we don't really allocate a lvb, lr_lvb_data just points to
402 * the appropriate backend structures.
404 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
408 if (res->lr_lvb_data == NULL)
411 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
412 struct lquota_entry *lqe = res->lr_lvb_data;
414 /* release lqe reference */
417 struct dt_object *obj = res->lr_lvb_data;
425 /* initialize environment */
426 rc = lu_env_init(env, LCT_LOCAL);
432 /* release object reference */
433 lu_object_put(env, &obj->do_lu);
438 res->lr_lvb_data = NULL;
444 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
445 struct obd_uuid *, union ldlm_gl_desc *,
448 * Send glimpse callback to slaves holding a lock on resource \res.
449 * This is used to notify slaves of new quota settings or to claim quota space
452 * \param env - is the environment passed by the caller
453 * \param qmt - is the quota master target
454 * \param res - is the dlm resource associated with the quota object
455 * \param desc - is the glimpse descriptor to pack in glimpse callback
456 * \param cb - is the callback function called on every lock and determine
457 * whether a glimpse should be issued
458 * \param arg - is an opaq parameter passed to the callback function
460 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
461 struct ldlm_resource *res, union ldlm_gl_desc *desc,
462 qmt_glimpse_cb_t cb, void *arg)
464 cfs_list_t *tmp, *pos;
465 CFS_LIST_HEAD(gl_list);
470 /* scan list of granted locks */
471 cfs_list_for_each(pos, &res->lr_granted) {
472 struct ldlm_glimpse_work *work;
473 struct ldlm_lock *lock;
474 struct obd_uuid *uuid;
476 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
477 LASSERT(lock->l_export);
478 uuid = &lock->l_export->exp_client_uuid;
481 rc = cb(env, qmt, uuid, desc, arg);
483 /* slave should not be notified */
486 /* something wrong happened, we still notify */
487 CERROR("%s: callback function failed to "
488 "determine whether slave %s should be "
489 "notified (%d)\n", qmt->qmt_svname,
490 obd_uuid2str(uuid), rc);
495 CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
500 cfs_list_add_tail(&work->gl_list, &gl_list);
501 work->gl_lock = LDLM_LOCK_GET(lock);
503 work->gl_desc = desc;
508 if (cfs_list_empty(&gl_list)) {
509 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
513 /* issue glimpse callbacks to all connected slaves */
514 rc = ldlm_glimpse_locks(res, &gl_list);
516 cfs_list_for_each_safe(pos, tmp, &gl_list) {
517 struct ldlm_glimpse_work *work;
519 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
521 cfs_list_del(&work->gl_list);
522 CERROR("%s: failed to notify %s of new quota settings\n",
524 obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
525 LDLM_LOCK_RELEASE(work->gl_lock);
533 * Send glimpse request to all global quota locks to push new quota setting to
536 * \param env - is the environment passed by the caller
537 * \param lqe - is the lquota entry which has new settings
538 * \param ver - is the version associated with the setting change
540 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
543 struct qmt_thread_info *qti = qmt_info(env);
544 struct qmt_pool_info *pool = lqe2qpi(lqe);
545 struct ldlm_resource *res = NULL;
549 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
550 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
552 /* send glimpse callback to notify slaves of new quota settings */
553 qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id;
554 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
555 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
556 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
557 qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
558 qti->qti_gl_desc.lquota_desc.gl_ver = ver;
560 /* look up ldlm resource associated with global index */
561 fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
562 res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
565 /* this might happen if no slaves have enqueued global quota
567 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
568 "with "DFID, PFID(&qti->qti_fid));
572 rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
574 ldlm_resource_putref(res);
578 /* Callback function used to select locks that should be glimpsed when
579 * broadcasting the new qunit value */
580 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
581 struct obd_uuid *uuid, union ldlm_gl_desc *desc,
584 struct obd_uuid *slv_uuid = arg;
586 if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
592 * Send glimpse request on per-ID lock to push new qunit value to slave.
594 * \param env - is the environment passed by the caller
595 * \param qmt - is the quota master target device
596 * \param lqe - is the lquota entry with the new qunit value
597 * \param uuid - is the uuid of the slave acquiring space, if any
599 static void qmt_id_lock_glimpse(const struct lu_env *env,
600 struct qmt_device *qmt,
601 struct lquota_entry *lqe, struct obd_uuid *uuid)
603 struct qmt_thread_info *qti = qmt_info(env);
604 struct qmt_pool_info *pool = lqe2qpi(lqe);
605 struct ldlm_resource *res = NULL;
609 if (!lqe->lqe_enforced)
612 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
613 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
614 fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
615 res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
618 /* this might legitimately happens if slaves haven't had the
619 * opportunity to enqueue quota lock yet. */
620 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
621 "lock "DFID, PFID(&qti->qti_fid));
623 if (lqe->lqe_revoke_time == 0 &&
624 lqe->lqe_qunit == pool->qpi_least_qunit)
625 lqe->lqe_revoke_time = cfs_time_current_64();
626 lqe_write_unlock(lqe);
631 /* The purpose of glimpse callback on per-ID lock is twofold:
632 * - notify slaves of new qunit value and hope they will release some
633 * spare quota space in return
634 * - notify slaves that master ran out of quota space and there is no
635 * need to send acquire request any more until further notice */
637 /* fill glimpse descriptor with lqe settings */
639 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
641 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
642 qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
644 if (lqe->lqe_revoke_time == 0 &&
645 qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
646 /* reset lqe_may_rel, it will be updated on glimpse callback
647 * replies if needed */
648 lqe->lqe_may_rel = 0;
650 /* The rebalance thread is the only thread which can issue glimpses */
651 LASSERT(!lqe->lqe_gl);
653 lqe_write_unlock(lqe);
655 /* issue glimpse callback to slaves */
656 rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
657 uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
660 if (lqe->lqe_revoke_time == 0 &&
661 qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
662 lqe->lqe_qunit == pool->qpi_least_qunit) {
663 lqe->lqe_revoke_time = cfs_time_current_64();
664 qmt_adjust_edquot(lqe, cfs_time_current_sec());
666 LASSERT(lqe->lqe_gl);
668 lqe_write_unlock(lqe);
670 ldlm_resource_putref(res);
675 * Schedule a glimpse request on per-ID locks to push new qunit value or
676 * edquot flag to quota slaves.
678 * \param qmt - is the quota master target device
679 * \param lqe - is the lquota entry with the new qunit value
681 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
687 spin_lock(&qmt->qmt_reba_lock);
688 if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
689 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
692 spin_unlock(&qmt->qmt_reba_lock);
695 cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
702 * The rebalance thread is in charge of sending glimpse callbacks on per-ID
703 * quota locks owned by slaves in order to notify them of:
704 * - a qunit shrink in which case slaves might release quota space back in
706 * - set/clear edquot flag used to cache the "quota exhausted" state of the
707 * master. When the flag is set, slaves know that there is no need to
708 * try to acquire quota from the master since this latter has already
709 * distributed all the space.
711 static int qmt_reba_thread(void *arg)
713 struct qmt_device *qmt = (struct qmt_device *)arg;
714 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
715 struct l_wait_info lwi = { 0 };
717 struct lquota_entry *lqe, *tmp;
725 rc = lu_env_init(env, LCT_MD_THREAD);
727 CERROR("%s: failed to init env.", qmt->qmt_svname);
732 thread_set_flags(thread, SVC_RUNNING);
733 cfs_waitq_signal(&thread->t_ctl_waitq);
736 l_wait_event(thread->t_ctl_waitq,
737 !cfs_list_empty(&qmt->qmt_reba_list) ||
738 !thread_is_running(thread), &lwi);
740 spin_lock(&qmt->qmt_reba_lock);
741 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
743 cfs_list_del_init(&lqe->lqe_link);
744 spin_unlock(&qmt->qmt_reba_lock);
746 if (thread_is_running(thread))
747 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
750 spin_lock(&qmt->qmt_reba_lock);
752 spin_unlock(&qmt->qmt_reba_lock);
754 if (!thread_is_running(thread))
759 thread_set_flags(thread, SVC_STOPPED);
760 cfs_waitq_signal(&thread->t_ctl_waitq);
765 * Start rebalance thread. Called when the QMT is being setup
767 int qmt_start_reba_thread(struct qmt_device *qmt)
769 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
770 struct l_wait_info lwi = { 0 };
774 task = kthread_run(qmt_reba_thread, (void *)qmt,
775 "qmt_reba_%s", qmt->qmt_svname);
777 CERROR("%s: failed to start rebalance thread (%ld)\n",
778 qmt->qmt_svname, PTR_ERR(task));
779 thread_set_flags(thread, SVC_STOPPED);
780 RETURN(PTR_ERR(task));
783 l_wait_event(thread->t_ctl_waitq,
784 thread_is_running(thread) || thread_is_stopped(thread),
791 * Stop rebalance thread. Called when the QMT is about to shutdown.
793 void qmt_stop_reba_thread(struct qmt_device *qmt)
795 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
797 if (!thread_is_stopped(thread)) {
798 struct l_wait_info lwi = { 0 };
800 thread_set_flags(thread, SVC_STOPPING);
801 cfs_waitq_signal(&thread->t_ctl_waitq);
803 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
806 LASSERT(cfs_list_empty(&qmt->qmt_reba_list));