4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012 Intel, Inc.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_LQUOTA
37 #include <lustre_net.h>
38 #include <lustre_import.h>
39 #include <lustre_dlm.h>
40 #include <obd_class.h>
42 #include "qsd_internal.h"
44 struct qsd_async_args {
45 struct obd_export *aa_exp;
46 struct qsd_qtype_info *aa_qqi;
48 union ldlm_wire_lvb *aa_lvb;
49 struct lustre_handle aa_lockh;
50 qsd_req_completion_t aa_completion;
54 * non-intent quota request interpret callback.
56 * \param env - the environment passed by the caller
57 * \param req - the non-intent quota request
58 * \param arg - qsd_async_args
59 * \param rc - request status
62 * \retval -ve - appropriate errors
64 static int qsd_dqacq_interpret(const struct lu_env *env,
65 struct ptlrpc_request *req, void *arg, int rc)
67 struct quota_body *rep_qbody = NULL, *req_qbody;
68 struct qsd_async_args *aa = (struct qsd_async_args *)arg;
71 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
72 if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
73 rep_qbody = req_capsule_server_get(&req->rq_pill,
75 aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
76 NULL, aa->aa_arg, rc);
81 * Send non-intent quota request to master.
83 * \param env - the environment passed by the caller
84 * \param exp - is the export to use to send the acquire RPC
85 * \param qbody - quota body to be packed in request
86 * \param sync - synchronous or asynchronous
87 * \param completion - completion callback
88 * \param qqi - is the qsd_qtype_info structure to pass to the completion
90 * \param lqe - is the qid entry to be processed
93 * \retval -ve - appropriate errors
95 int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
96 struct quota_body *qbody, bool sync,
97 qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
98 struct lustre_handle *lockh, struct lquota_entry *lqe)
100 struct ptlrpc_request *req;
101 struct quota_body *req_qbody;
102 struct qsd_async_args *aa;
108 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
110 GOTO(out, rc = -ENOMEM);
112 req->rq_no_resend = req->rq_no_delay = 1;
113 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
115 ptlrpc_request_free(req);
119 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
122 ptlrpc_request_set_replen(req);
124 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
125 aa = ptlrpc_req_async_args(req);
128 aa->aa_arg = (void *)lqe;
129 aa->aa_completion = completion;
130 lustre_handle_copy(&aa->aa_lockh, lockh);
133 rc = ptlrpc_queue_wait(req);
134 rc = qsd_dqacq_interpret(env, req, aa, rc);
135 ptlrpc_req_finished(req);
137 req->rq_interpret_reply = qsd_dqacq_interpret;
138 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
143 completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
148 * intent quota request interpret callback.
150 * \param env - the environment passed by the caller
151 * \param req - the intent quota request
152 * \param arg - qsd_async_args
153 * \param rc - request status
155 * \retval 0 - success
156 * \retval -ve - appropriate errors
158 static int qsd_intent_interpret(const struct lu_env *env,
159 struct ptlrpc_request *req, void *arg, int rc)
161 struct lustre_handle *lockh;
162 struct quota_body *rep_qbody = NULL, *req_qbody;
163 struct ldlm_intent *lit;
164 struct qsd_async_args *aa = (struct qsd_async_args *)arg;
165 int flags = LDLM_FL_HAS_INTENT;
169 lockh = &aa->aa_lockh;
170 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
171 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
173 rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
174 &flags, (void *)aa->aa_lvb,
175 sizeof(union ldlm_wire_lvb), lockh, rc);
177 /* the lock has been destroyed, forget about the lock handle */
178 memset(lockh, 0, sizeof(*lockh));
180 if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
181 rep_qbody = req_capsule_server_get(&req->rq_pill,
184 aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
185 aa->aa_lvb, aa->aa_arg, rc);
190 * Get intent per-ID lock or global-index lock from master.
192 * \param env - the environment passed by the caller
193 * \param exp - is the export to use to send the intent RPC
194 * \param qbody - quota body to be packed in request
195 * \param sync - synchronous or asynchronous (pre-acquire)
196 * \param it_op - IT_QUOTA_DQACQ or IT_QUOTA_CONN
197 * \param completion - completion callback
198 * \param qqi - is the qsd_qtype_info structure to pass to the completion
200 * \param lvb - is the lvb associated with the lock and returned by the
202 * \param arg - is an opaq argument passed to the completion callback
204 * \retval 0 - success
205 * \retval -ve - appropriate errors
207 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
208 struct quota_body *qbody, bool sync, int it_op,
209 qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
210 union ldlm_wire_lvb *lvb, void *arg)
212 struct qsd_thread_info *qti = qsd_info(env);
213 struct ptlrpc_request *req;
214 struct qsd_async_args *aa = NULL;
215 struct ldlm_intent *lit;
216 struct quota_body *req_qbody;
217 int rc, flags = LDLM_FL_HAS_INTENT;
220 LASSERT(exp != NULL);
221 LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
223 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
225 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
226 &RQF_LDLM_INTENT_QUOTA);
228 GOTO(out, rc = -ENOMEM);
230 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
232 ptlrpc_request_free(req);
236 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
237 lit->opc = (__u64)it_op;
239 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
242 ptlrpc_request_set_replen(req);
246 /* build resource name associated with global index */
247 fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
249 /* copy einfo template and fill ei_cbdata with qqi pointer */
250 memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
251 qti->qti_einfo.ei_cbdata = qqi;
253 /* don't cancel global lock on memory pressure */
254 flags |= LDLM_FL_NO_LRU;
257 /* build resource name associated for per-ID quota lock */
258 fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
261 /* copy einfo template and fill ei_cbdata with lqe pointer */
262 memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
263 qti->qti_einfo.ei_cbdata = arg;
266 LASSERTF(0, "invalid it_op %d", it_op);
269 /* build lock enqueue request */
270 rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
271 &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
274 ptlrpc_req_finished(req);
278 /* grab reference on backend structure for the new lock */
281 /* grab reference on qqi for new lock */
283 struct ldlm_lock *lock;
285 lock = ldlm_handle2lock(&qti->qti_lockh);
287 ptlrpc_req_finished(req);
290 lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
296 /* grab reference on lqe for new lock */
297 lqe_getref((struct lquota_entry *)arg);
298 /* all acquire/release request are sent with no_resend and
300 req->rq_no_resend = req->rq_no_delay = 1;
306 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
307 aa = ptlrpc_req_async_args(req);
312 aa->aa_completion = completion;
313 lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
316 /* send lock enqueue request and wait for completion */
317 rc = ptlrpc_queue_wait(req);
318 rc = qsd_intent_interpret(env, req, aa, rc);
319 ptlrpc_req_finished(req);
321 /* queue lock request and return */
322 req->rq_interpret_reply = qsd_intent_interpret;
323 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
328 completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
333 * Fetch a global or slave index from the QMT.
335 * \param env - the environment passed by the caller
336 * \param exp - is the export to use to issue the OBD_IDX_READ RPC
337 * \param ii - is the index information to be packed in the request
338 * on success, the index information returned by the server
340 * \param npages - is the number of pages in the pages array
341 * \param pages - is an array of @npages pages
343 * \retval 0 - success
344 * \retval -ve - appropriate errors
346 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
347 struct idx_info *ii, unsigned int npages,
348 cfs_page_t **pages, bool *need_swab)
350 struct ptlrpc_request *req;
351 struct idx_info *req_ii;
352 struct ptlrpc_bulk_desc *desc;
358 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
362 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
364 ptlrpc_request_free(req);
368 req->rq_request_portal = MDS_READPAGE_PORTAL;
369 ptlrpc_at_set_req_timeout(req);
371 /* allocate bulk descriptor */
372 desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
375 ptlrpc_request_free(req);
379 /* req now owns desc and will free it when it gets freed */
380 for (i = 0; i < npages; i++)
381 ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
383 /* pack index information in request */
384 req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
387 ptlrpc_request_set_replen(req);
389 /* send request to master and wait for RPC to complete */
390 rc = ptlrpc_queue_wait(req);
394 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
395 req->rq_bulk->bd_nob_transferred);
400 req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
403 *need_swab = ptlrpc_rep_need_swab(req);
407 ptlrpc_req_finished(req);