4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2011, 2012, Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <lustre_dlm.h>
38 #include <obd_class.h>
40 #include "qsd_internal.h"
43 * Return qsd_qtype_info structure associated with a global lock
45 * \param lock - is the global lock from which we should extract the qqi
46 * \param reset - whether lock->l_ast_data should be cleared
48 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
50 struct qsd_qtype_info *qqi;
53 lock_res_and_lock(lock);
54 qqi = lock->l_ast_data;
57 lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
59 lock->l_ast_data = NULL;
61 unlock_res_and_lock(lock);
63 if (reset && qqi != NULL) {
64 /* release qqi reference hold for the lock */
66 lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
72 * Return lquota entry structure associated with a per-ID lock
74 * \param lock - is the per-ID lock from which we should extract the lquota
76 * \param reset - whether lock->l_ast_data should be cleared
78 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
80 struct lquota_entry *lqe;
83 lock_res_and_lock(lock);
84 lqe = lock->l_ast_data;
88 lock->l_ast_data = NULL;
90 unlock_res_and_lock(lock);
92 if (reset && lqe != NULL)
93 /* release lqe reference hold for the lock */
99 * Glimpse callback handler for all quota locks. This function extracts
100 * information from the glimpse request.
102 * \param lock - is the lock targeted by the glimpse
103 * \param data - is a pointer to the glimpse ptlrpc request
104 * \param req - is the glimpse request
105 * \param desc - is the glimpse descriptor describing the purpose of the glimpse
107 * \param lvb - is the pointer to the lvb in the reply buffer
109 * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
110 * appropriate error on failure
112 static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
113 struct ldlm_gl_lquota_desc **desc, void **lvb)
118 LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
120 /* glimpse on quota locks always packs a glimpse descriptor */
121 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
123 /* extract glimpse descriptor */
124 *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
129 rc = req_capsule_server_pack(&req->rq_pill);
131 CERROR("Can't pack response, rc %d\n", rc);
136 *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
142 * Blocking callback handler for global index lock
144 * \param lock - is the lock for which ast occurred.
145 * \param desc - is the description of a conflicting lock in case of blocking
147 * \param data - is the value of lock->l_ast_data
148 * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
149 * cancellation and blocking ast's.
151 static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
152 struct ldlm_lock_desc *desc, void *data,
159 case LDLM_CB_BLOCKING: {
160 struct lustre_handle lockh;
162 LDLM_DEBUG(lock, "blocking AST on global quota lock");
163 ldlm_lock2handle(lock, &lockh);
164 rc = ldlm_cli_cancel(&lockh);
167 case LDLM_CB_CANCELING: {
168 struct qsd_qtype_info *qqi;
170 LDLM_DEBUG(lock, "canceling global quota lock");
172 qqi = qsd_glb_ast_data_get(lock, true);
176 /* we are losing the global index lock, so let's mark the
177 * global & slave indexes as not up-to-date any more */
178 cfs_write_lock(&qqi->qqi_qsd->qsd_lock);
179 qqi->qqi_glb_uptodate = false;
180 qqi->qqi_slv_uptodate = false;
181 if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
182 memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
183 cfs_write_unlock(&qqi->qqi_qsd->qsd_lock);
185 CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
186 qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
188 /* kick off reintegration thread if not running already, if
189 * it's just local cancel (for stack clean up or eviction),
190 * don't re-trigger the reintegration. */
191 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
192 qsd_start_reint_thread(qqi);
194 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
199 LASSERTF(0, "invalid flags for blocking ast %d", flag);
206 * Glimpse callback handler for global quota lock.
208 * \param lock - is the lock targeted by the glimpse
209 * \param data - is a pointer to the glimpse ptlrpc request
211 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
213 struct ptlrpc_request *req = data;
214 struct qsd_qtype_info *qqi;
215 struct ldlm_gl_lquota_desc *desc;
216 struct lquota_lvb *lvb;
217 struct lquota_glb_rec rec;
221 rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
225 qqi = qsd_glb_ast_data_get(lock, false);
228 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
230 LCONSOLE_INFO("%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
231 " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
232 desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
235 if (desc->gl_ver == 0) {
236 CERROR("%s: invalid global index version "LPU64"\n",
237 qqi->qqi_qsd->qsd_svname, desc->gl_ver);
238 GOTO(out_qqi, rc = -EINVAL);
241 /* extract new hard & soft limits from the glimpse descriptor */
242 rec.qbr_hardlimit = desc->gl_hardlimit;
243 rec.qbr_softlimit = desc->gl_softlimit;
247 /* We can't afford disk io in the context of glimpse callback handling
248 * thread, so the on-disk global limits update has to be deferred. */
249 qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
253 lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
260 struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
262 qsd_glb_blocking_ast,
267 * Blocking callback handler for per-ID lock
269 * \param lock - is the lock for which ast occurred.
270 * \param desc - is the description of a conflicting lock in case of blocking
272 * \param data - is the value of lock->l_ast_data
273 * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
274 * cancellation and blocking ast's.
276 static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
277 void *data, int flag)
279 struct lustre_handle lockh;
284 case LDLM_CB_BLOCKING: {
286 LDLM_DEBUG(lock, "blocking AST on ID quota lock");
287 ldlm_lock2handle(lock, &lockh);
288 rc = ldlm_cli_cancel(&lockh);
291 case LDLM_CB_CANCELING: {
293 struct lquota_entry *lqe;
296 LDLM_DEBUG(lock, "canceling global quota lock");
297 lqe = qsd_id_ast_data_get(lock, true);
301 LQUOTA_DEBUG(lqe, "losing ID lock");
303 /* just local cancel (for stack clean up or eviction), don't
304 * release quota space in this case */
305 if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
310 /* allocate environment */
318 /* initialize environment */
319 rc = lu_env_init(env, LCT_DT_THREAD);
326 ldlm_lock2handle(lock, &lockh);
328 if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
329 /* Clear lqe_lockh & reset qunit to 0 */
330 qsd_set_qunit(lqe, 0);
331 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
332 lqe->lqe_edquot = false;
335 lqe_write_unlock(lqe);
337 /* If there is qqacq inflight, the release will be skipped
338 * at this time, and triggered on dqacq completion later,
339 * which means there could be a short window that slave is
340 * holding spare grant wihtout per-ID lock. */
342 rc = qsd_dqacq(env, lqe, QSD_REL);
344 /* release lqe reference grabbed by qsd_id_ast_data_get() */
351 LASSERTF(0, "invalid flags for blocking ast %d", flag);
358 * Glimpse callback handler for per-ID quota locks.
360 * \param lock - is the lock targeted by the glimpse
361 * \param data - is a pointer to the glimpse ptlrpc request
363 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
365 struct ptlrpc_request *req = data;
366 struct lquota_entry *lqe;
367 struct qsd_instance *qsd;
368 struct ldlm_gl_lquota_desc *desc;
369 struct lquota_lvb *lvb;
374 rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
378 lqe = qsd_id_ast_data_get(lock, false);
381 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
383 LQUOTA_CONSOLE(lqe, "glimpse on quota locks, new qunit:"LPU64,
386 qsd = lqe2qqi(lqe)->qqi_qsd;
390 if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
393 /* extract new qunit from glimpse request */
394 qsd_set_qunit(lqe, desc->gl_qunit);
396 space = lqe->lqe_granted - lqe->lqe_pending_rel;
397 space -= lqe->lqe_usage;
398 space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
399 space -= lqe->lqe_qunit;
402 if (lqe->lqe_pending_req > 0) {
403 LQUOTA_ERROR(lqe, "request in flight, postpone "
404 "release of "LPD64, space);
405 lvb->lvb_id_may_rel = space;
407 lqe->lqe_pending_req++;
409 /* release quota space in glimpse reply */
410 LQUOTA_ERROR(lqe, "releasing "LPD64, space);
411 lqe->lqe_granted -= space;
412 lvb->lvb_id_rel = space;
414 lqe_write_unlock(lqe);
415 /* change the lqe_granted */
416 qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
417 (union lquota_rec *)&lqe->lqe_granted,
421 lqe->lqe_pending_req--;
427 lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
428 lqe_write_unlock(lqe);
431 cfs_waitq_broadcast(&lqe->lqe_waiters);
438 struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
446 * Check whether a slave already own a ldlm lock for the quota identifier \qid.
448 * \param lockh - is the local lock handle from lquota entry.
449 * \param rlockh - is the remote lock handle of the matched lock, if any.
451 * \retval 0 : on successful look up and \lockh contains the lock handle.
452 * \retval -ENOENT: no lock found
454 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
456 struct ldlm_lock *lock;
462 if (!lustre_handle_is_used(lockh))
465 rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
469 LASSERT(lustre_handle_is_used(lockh));
470 ldlm_lock_dump_handle(D_QUOTA, lockh);
475 /* look up lock associated with local handle and extract remote handle
476 * to be packed in quota request */
477 lock = ldlm_handle2lock(lockh);
478 LASSERT(lock != NULL);
479 lustre_handle_copy(rlockh, &lock->l_remote_handle);
485 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
487 struct qsd_thread_info *qti = qsd_info(env);
492 if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
493 lqe->lqe_usage || lqe->lqe_granted) {
494 lqe_write_unlock(lqe);
498 lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
499 if (lustre_handle_is_used(&qti->qti_lockh)) {
500 memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
501 qsd_set_qunit(lqe, 0);
502 lqe->lqe_edquot = false;
504 lqe_write_unlock(lqe);
506 rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
510 ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);