4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
40 #include "qmt_internal.h"
42 /* intent policy function called from mdt_intent_opc() when the intent is of
44 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
45 struct ptlrpc_request *req, struct ldlm_lock **lockp,
48 struct qmt_device *qmt = lu2qmt_dev(ld);
49 struct ldlm_intent *it;
50 struct quota_body *reqbody;
51 struct quota_body *repbody;
52 struct obd_uuid *uuid;
53 struct lquota_lvb *lvb;
57 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
59 /* extract quota body and intent opc */
60 it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
62 RETURN(err_serious(-EFAULT));
64 reqbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
66 RETURN(err_serious(-EFAULT));
69 rc = req_capsule_server_pack(&req->rq_pill);
71 CERROR("Can't pack response, rc %d\n", rc);
72 RETURN(err_serious(rc));
75 repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
77 RETURN(err_serious(-EFAULT));
79 uuid = &(*lockp)->l_export->exp_client_uuid;
83 /* XXX: to be added in a next patch */
84 GOTO(out, -EOPNOTSUPP);
88 /* new connection from slave */
89 rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
91 &repbody->qb_slv_ver, uuid);
97 CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
99 GOTO(out, rc = err_serious(-EINVAL));
102 /* on success, pack lvb in reply */
103 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
104 ldlm_lvbo_size(*lockp));
105 lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
106 ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
113 * Initialize quota LVB associated with quota indexes.
114 * Called with res->lr_lvb_sem held
116 int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
119 struct qmt_thread_info *qti;
120 struct qmt_device *qmt = lu2qmt_dev(ld);
121 int pool_id, pool_type, qtype;
125 LASSERT(res != NULL);
127 if (res->lr_type != LDLM_PLAIN)
130 if (res->lr_lvb_data ||
131 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
138 /* initialize environment */
139 rc = lu_env_init(env, LCT_MD_THREAD);
146 /* extract global index FID and quota identifier */
147 fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
149 /* sanity check the global index FID */
150 rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
152 CERROR("can't extract pool information from FID "DFID"\n",
153 PFID(&qti->qti_fid));
157 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
158 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
159 * we are thus dealing with an ID lock. */
160 struct lquota_entry *lqe;
162 /* Find the quota entry associated with the quota id */
163 lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
166 GOTO(out, rc = PTR_ERR(lqe));
168 /* store reference to lqe in lr_lvb_data */
169 res->lr_lvb_data = lqe;
170 LQUOTA_DEBUG(lqe, "initialized res lvb");
172 struct dt_object *obj;
174 /* lookup global index */
175 obj = dt_locate(env, qmt->qmt_child, &qti->qti_fid);
177 GOTO(out, rc = PTR_ERR(obj));
178 if (!dt_object_exists(obj)) {
179 lu_object_put(env, &obj->do_lu);
180 GOTO(out, rc = -ENOENT);
183 /* store reference to global index object in lr_lvb_data */
184 res->lr_lvb_data = obj;
185 CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
188 res->lr_lvb_len = sizeof(struct lquota_lvb);
197 * Update LVB associated with the global quota index.
198 * This function is called from the DLM itself after a glimpse callback, in this
199 * case valid ptlrpc request is passed.
201 int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
202 struct ptlrpc_request *req, int increase_only)
205 struct qmt_thread_info *qti;
206 struct qmt_device *qmt = lu2qmt_dev(ld);
207 struct lquota_entry *lqe;
208 struct lquota_lvb *lvb;
212 LASSERT(res != NULL);
217 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
218 /* no need to update lvb for global quota locks */
221 lqe = res->lr_lvb_data;
222 LASSERT(lqe != NULL);
224 /* allocate environement */
229 /* initialize environment */
230 rc = lu_env_init(env, LCT_MD_THREAD);
237 lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
239 CERROR("%s: failed to extract lvb from request\n",
244 /* XXX: Space release handling to be added in a next patch */
254 * Report size of lvb to ldlm layer in order to allocate lvb buffer
255 * As far as quota locks are concerned, the size is static and is the same
256 * for both global and per-ID locks which shares the same lvb format.
258 int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
260 return sizeof(struct lquota_lvb);
264 * Fill request buffer with quota lvb
266 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
269 struct ldlm_resource *res = lock->l_resource;
270 struct lquota_lvb *qlvb = lvb;
273 LASSERT(res != NULL);
275 if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
276 res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
279 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
280 /* no ID quota lock associated with UID/GID 0 or with a seq 0,
281 * we are thus dealing with an ID lock. */
282 struct lquota_entry *lqe = res->lr_lvb_data;
284 /* return current qunit value & edquot flags in lvb */
286 qlvb->lvb_id_qunit = lqe->lqe_qunit;
289 qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
292 /* global quota lock */
295 struct dt_object *obj = res->lr_lvb_data;
301 /* initialize environment */
302 rc = lu_env_init(env, LCT_LOCAL);
308 /* return current version of global index */
309 qlvb->lvb_glb_ver = dt_version_get(env, obj);
315 RETURN(sizeof(struct lquota_lvb));
319 * Free lvb associated with a given ldlm resource
320 * we don't really allocate a lvb, lr_lvb_data just points to
321 * the appropriate backend structures.
323 int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
327 if (res->lr_lvb_data == NULL)
330 if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
331 struct lquota_entry *lqe = res->lr_lvb_data;
333 /* release lqe reference */
336 struct dt_object *obj = res->lr_lvb_data;
344 /* initialize environment */
345 rc = lu_env_init(env, LCT_LOCAL);
351 /* release object reference */
352 lu_object_put(env, &obj->do_lu);
357 res->lr_lvb_data = NULL;
363 typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
364 struct obd_uuid *, union ldlm_gl_desc *,
367 * Send glimpse callback to slaves holding a lock on resource \res.
368 * This is used to notify slaves of new quota settings or to claim quota space
371 * \param env - is the environment passed by the caller
372 * \param qmt - is the quota master target
373 * \param res - is the dlm resource associated with the quota object
374 * \param desc - is the glimpse descriptor to pack in glimpse callback
375 * \param cb - is the callback function called on every lock and determine
376 * whether a glimpse should be issued
377 * \param arg - is an opaq parameter passed to the callback function
379 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
380 struct ldlm_resource *res, union ldlm_gl_desc *desc,
381 qmt_glimpse_cb_t cb, void *arg)
383 cfs_list_t *tmp, *pos;
384 CFS_LIST_HEAD(gl_list);
389 /* scan list of granted locks */
390 cfs_list_for_each(pos, &res->lr_granted) {
391 struct ldlm_glimpse_work *work;
392 struct ldlm_lock *lock;
393 struct obd_uuid *uuid;
395 lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
396 LASSERT(lock->l_export);
397 uuid = &lock->l_export->exp_client_uuid;
400 rc = cb(env, qmt, uuid, desc, arg);
402 /* slave should not be notified */
405 /* something wrong happened, we still notify */
406 CERROR("%s: callback function failed to "
407 "determine whether slave %s should be "
408 "notified (%d)\n", qmt->qmt_svname,
409 obd_uuid2str(uuid), rc);
414 CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
419 cfs_list_add_tail(&work->gl_list, &gl_list);
420 work->gl_lock = LDLM_LOCK_GET(lock);
422 work->gl_desc = desc;
427 if (cfs_list_empty(&gl_list)) {
428 CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
432 /* issue glimpse callbacks to all connected slaves */
433 rc = ldlm_glimpse_locks(res, &gl_list);
435 cfs_list_for_each_safe(pos, tmp, &gl_list) {
436 struct ldlm_glimpse_work *work;
438 work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
440 cfs_list_del(&work->gl_list);
441 CERROR("%s: failed to notify %s of new quota settings\n",
443 obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
444 LDLM_LOCK_RELEASE(work->gl_lock);
452 * Send glimpse request to all global quota locks to push new quota setting to
455 * \param env - is the environment passed by the caller
456 * \param lqe - is the lquota entry which has new settings
457 * \param ver - is the version associated with the setting change
459 void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
462 struct qmt_thread_info *qti = qmt_info(env);
463 struct qmt_pool_info *pool = lqe2qpi(lqe);
464 struct ldlm_resource *res = NULL;
468 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
469 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
471 /* send glimpse callback to notify slaves of new quota settings */
472 qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id;
473 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
474 qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
475 qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
476 qti->qti_gl_desc.lquota_desc.gl_ver = ver;
478 /* look up ldlm resource associated with global index */
479 fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
480 res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
483 /* this might happen if no slaves have enqueued global quota
485 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
486 "with "DFID, PFID(&qti->qti_fid));
490 rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
492 ldlm_resource_putref(res);
496 /* Callback function used to select locks that should be glimpsed when
497 * broadcasting the new qunit value */
498 static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
499 struct obd_uuid *uuid, union ldlm_gl_desc *desc,
502 struct obd_uuid *slv_uuid = arg;
504 if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
510 * Send glimpse request on per-ID lock to push new qunit value to slave.
512 * \param env - is the environment passed by the caller
513 * \param qmt - is the quota master target device
514 * \param lqe - is the lquota entry with the new qunit value
515 * \param uuid - is the uuid of the slave acquiring space, if any
517 static void qmt_id_lock_glimpse(const struct lu_env *env,
518 struct qmt_device *qmt,
519 struct lquota_entry *lqe, struct obd_uuid *uuid)
521 struct qmt_thread_info *qti = qmt_info(env);
522 struct qmt_pool_info *pool = lqe2qpi(lqe);
523 struct ldlm_resource *res = NULL;
527 if (!lqe->lqe_enforced)
530 lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
531 pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
532 fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
533 res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
536 /* this might legitimately happens if slaves haven't had the
537 * opportunity to enqueue quota lock yet. */
538 LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
539 "lock "DFID, PFID(&qti->qti_fid));
544 /* The purpose of glimpse callback on per-ID lock is twofold:
545 * - notify slaves of new qunit value and hope they will release some
546 * spare quota space in return
547 * - notify slaves that master ran out of quota space and there is no
548 * need to send acquire request any more until further notice */
550 /* fill glimpse descriptor with lqe settings */
552 qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
554 qti->qti_gl_desc.lquota_desc.gl_flags = 0;
555 qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
556 lqe_read_unlock(lqe);
558 /* The rebalance thread is the only thread which can issue glimpses */
559 LASSERT(!lqe->lqe_gl);
562 /* issue glimpse callback to slaves */
563 rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
564 uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
566 LASSERT(lqe->lqe_gl);
569 ldlm_resource_putref(res);
574 * Schedule a glimpse request on per-ID locks to push new qunit value or
575 * edquot flag to quota slaves.
577 * \param qmt - is the quota master target device
578 * \param lqe - is the lquota entry with the new qunit value
580 void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
586 cfs_spin_lock(&qmt->qmt_reba_lock);
587 if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
588 cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
591 cfs_spin_unlock(&qmt->qmt_reba_lock);
594 cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
601 * The rebalance thread is in charge of sending glimpse callbacks on per-ID
602 * quota locks owned by slaves in order to notify them of:
603 * - a qunit shrink in which case slaves might release quota space back in
605 * - set/clear edquot flag used to cache the "quota exhausted" state of the
606 * master. When the flag is set, slaves know that there is no need to
607 * try to acquire quota from the master since this latter has already
608 * distributed all the space.
610 static int qmt_reba_thread(void *arg)
612 struct qmt_device *qmt = (struct qmt_device *)arg;
613 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
614 struct l_wait_info lwi = { 0 };
616 struct lquota_entry *lqe, *tmp;
617 char pname[MTI_NAME_MAXLEN];
625 rc = lu_env_init(env, LCT_MD_THREAD);
627 CERROR("%s: failed to init env.", qmt->qmt_svname);
632 snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
633 cfs_daemonize(pname);
635 thread_set_flags(thread, SVC_RUNNING);
636 cfs_waitq_signal(&thread->t_ctl_waitq);
639 l_wait_event(thread->t_ctl_waitq,
640 !cfs_list_empty(&qmt->qmt_reba_list) ||
641 !thread_is_running(thread), &lwi);
643 cfs_spin_lock(&qmt->qmt_reba_lock);
644 cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
646 cfs_list_del_init(&lqe->lqe_link);
647 cfs_spin_unlock(&qmt->qmt_reba_lock);
649 if (thread_is_running(thread))
650 qmt_id_lock_glimpse(env, qmt, lqe, NULL);
653 cfs_spin_lock(&qmt->qmt_reba_lock);
655 cfs_spin_unlock(&qmt->qmt_reba_lock);
657 if (!thread_is_running(thread))
662 thread_set_flags(thread, SVC_STOPPED);
663 cfs_waitq_signal(&thread->t_ctl_waitq);
668 * Start rebalance thread. Called when the QMT is being setup
670 int qmt_start_reba_thread(struct qmt_device *qmt)
672 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
673 struct l_wait_info lwi = { 0 };
677 rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
679 CERROR("%s: failed to start rebalance thread (%d)\n",
680 qmt->qmt_svname, rc);
681 thread_set_flags(thread, SVC_STOPPED);
685 l_wait_event(thread->t_ctl_waitq,
686 thread_is_running(thread) || thread_is_stopped(thread),
693 * Stop rebalance thread. Called when the QMT is about to shutdown.
695 void qmt_stop_reba_thread(struct qmt_device *qmt)
697 struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
699 if (!thread_is_stopped(thread)) {
700 struct l_wait_info lwi = { 0 };
702 thread_set_flags(thread, SVC_STOPPING);
703 cfs_waitq_signal(&thread->t_ctl_waitq);
705 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
708 LASSERT(cfs_list_empty(&qmt->qmt_reba_list));