4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2016, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_dlm.h>
36 #include <obd_class.h>
38 #include "qsd_internal.h"
40 struct qsd_async_args {
41 struct obd_export *aa_exp;
42 struct qsd_qtype_info *aa_qqi;
44 struct lquota_lvb *aa_lvb;
45 struct lustre_handle aa_lockh;
46 qsd_req_completion_t aa_completion;
50 * non-intent quota request interpret callback.
52 * \param env - the environment passed by the caller
53 * \param req - the non-intent quota request
54 * \param arg - qsd_async_args
55 * \param rc - request status
58 * \retval -ve - appropriate errors
60 static int qsd_dqacq_interpret(const struct lu_env *env,
61 struct ptlrpc_request *req, void *arg, int rc)
63 struct quota_body *rep_qbody = NULL, *req_qbody;
64 struct qsd_async_args *aa = (struct qsd_async_args *)arg;
67 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68 if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
69 rep_qbody = req_capsule_server_get(&req->rq_pill,
71 aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
72 NULL, aa->aa_arg, rc);
77 * Send non-intent quota request to master.
79 * \param env - the environment passed by the caller
80 * \param exp - is the export to use to send the acquire RPC
81 * \param qbody - quota body to be packed in request
82 * \param sync - synchronous or asynchronous
83 * \param completion - completion callback
84 * \param qqi - is the qsd_qtype_info structure to pass to the completion
86 * \param lqe - is the qid entry to be processed
89 * \retval -ve - appropriate errors
91 int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
92 struct quota_body *qbody, bool sync,
93 qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
94 struct lustre_handle *lockh, struct lquota_entry *lqe)
96 struct ptlrpc_request *req;
97 struct quota_body *req_qbody;
98 struct qsd_async_args *aa;
104 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
106 GOTO(out, rc = -ENOMEM);
108 req->rq_no_resend = req->rq_no_delay = 1;
109 req->rq_no_retry_einprogress = 1;
110 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
112 ptlrpc_request_free(req);
116 req->rq_request_portal = MDS_READPAGE_PORTAL;
117 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
120 ptlrpc_request_set_replen(req);
122 aa = ptlrpc_req_async_args(aa, req);
125 aa->aa_arg = (void *)lqe;
126 aa->aa_completion = completion;
127 lustre_handle_copy(&aa->aa_lockh, lockh);
130 rc = ptlrpc_queue_wait(req);
131 rc = qsd_dqacq_interpret(env, req, aa, rc);
132 ptlrpc_req_finished(req);
134 req->rq_interpret_reply = qsd_dqacq_interpret;
135 ptlrpcd_add_req(req);
140 completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
145 * intent quota request interpret callback.
147 * \param env - the environment passed by the caller
148 * \param req - the intent quota request
149 * \param arg - qsd_async_args
150 * \param rc - request status
152 * \retval 0 - success
153 * \retval -ve - appropriate errors
155 static int qsd_intent_interpret(const struct lu_env *env,
156 struct ptlrpc_request *req, void *arg, int rc)
158 struct lustre_handle *lockh;
159 struct quota_body *rep_qbody = NULL, *req_qbody;
160 struct qsd_async_args *aa = (struct qsd_async_args *)arg;
161 struct ldlm_reply *lockrep;
162 __u64 flags = LDLM_FL_HAS_INTENT;
163 struct ldlm_enqueue_info einfo = {
164 .ei_type = LDLM_PLAIN,
170 lockh = &aa->aa_lockh;
171 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
172 req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
174 rc = ldlm_cli_enqueue_fini(aa->aa_exp, &req->rq_pill, &einfo, 0, &flags,
175 aa->aa_lvb, sizeof(*(aa->aa_lvb)),
178 /* the lock has been destroyed, forget about the lock handle */
179 memset(lockh, 0, sizeof(*lockh));
181 * To avoid the server being fullfilled by LDLM locks, server
182 * may reject the locking request by returning -EINPROGRESS,
183 * this is different from the -EINPROGRESS returned by quota
186 if (rc == -EINPROGRESS)
191 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
192 LASSERT(lockrep != NULL);
193 rc = ptlrpc_status_ntoh(lockrep->lock_policy_res2);
195 if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
196 rep_qbody = req_capsule_server_get(&req->rq_pill,
199 aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
200 aa->aa_lvb, aa->aa_arg, rc);
205 * Get intent per-ID lock or global-index lock from master.
207 * \param env - the environment passed by the caller
208 * \param exp - is the export to use to send the intent RPC
209 * \param qbody - quota body to be packed in request
210 * \param sync - synchronous or asynchronous (pre-acquire)
211 * \param it_op - IT_QUOTA_DQACQ or IT_QUOTA_CONN
212 * \param completion - completion callback
213 * \param qqi - is the qsd_qtype_info structure to pass to the completion
215 * \param lvb - is the lvb associated with the lock and returned by the
217 * \param arg - is an opaq argument passed to the completion callback
219 * \retval 0 - success
220 * \retval -ve - appropriate errors
222 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
223 struct quota_body *qbody, bool sync, int it_op,
224 qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
225 struct lquota_lvb *lvb, void *arg)
227 struct qsd_thread_info *qti = qsd_info(env);
228 struct ptlrpc_request *req;
229 struct qsd_async_args *aa = NULL;
230 struct ldlm_intent *lit;
231 struct quota_body *req_qbody;
232 __u64 flags = LDLM_FL_HAS_INTENT;
236 LASSERT(exp != NULL);
237 LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
239 memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
242 &RQF_LDLM_INTENT_QUOTA);
244 GOTO(out, rc = -ENOMEM);
246 req->rq_no_retry_einprogress = 1;
247 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
249 ptlrpc_request_free(req);
252 req->rq_request_portal = MDS_READPAGE_PORTAL;
254 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
255 lit->opc = (__u64)it_op;
257 req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
260 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
262 ptlrpc_request_set_replen(req);
266 /* build resource name associated with global index */
267 fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
269 /* copy einfo template and fill ei_cbdata with qqi pointer */
270 memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
271 qti->qti_einfo.ei_cbdata = qqi;
273 /* don't cancel global lock on memory pressure */
274 flags |= LDLM_FL_NO_LRU;
277 /* build resource name associated for per-ID quota lock */
278 fid_build_quota_res_name(&qbody->qb_fid, &qbody->qb_id,
281 /* copy einfo template and fill ei_cbdata with lqe pointer */
282 memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
283 qti->qti_einfo.ei_cbdata = arg;
286 LASSERTF(0, "invalid it_op %d\n", it_op);
289 /* build lock enqueue request */
290 rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
291 &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA,
294 ptlrpc_req_finished(req);
298 /* grab reference on backend structure for the new lock */
301 /* grab reference on qqi for new lock */
302 #ifdef CONFIG_LUSTRE_DEBUG_LU_REF
304 struct ldlm_lock *lock;
306 lock = ldlm_handle2lock(&qti->qti_lockh);
308 ptlrpc_req_finished(req);
309 GOTO(out, rc = -ENOLCK);
311 lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
318 /* grab reference on lqe for new lock */
319 lqe_getref((struct lquota_entry *)arg);
320 /* all acquire/release request are sent with no_resend and
322 req->rq_no_resend = req->rq_no_delay = 1;
328 aa = ptlrpc_req_async_args(aa, req);
333 aa->aa_completion = completion;
334 lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
337 /* send lock enqueue request and wait for completion */
338 rc = ptlrpc_queue_wait(req);
339 rc = qsd_intent_interpret(env, req, aa, rc);
340 ptlrpc_req_finished(req);
342 /* queue lock request and return */
343 req->rq_interpret_reply = qsd_intent_interpret;
344 ptlrpcd_add_req(req);
349 completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
354 * Fetch a global or slave index from the QMT.
356 * \param env - the environment passed by the caller
357 * \param exp - is the export to use to issue the OBD_IDX_READ RPC
358 * \param ii - is the index information to be packed in the request
359 * on success, the index information returned by the server
361 * \param npages - is the number of pages in the pages array
362 * \param pages - is an array of @npages pages
364 * \retval 0 - success
365 * \retval -ve - appropriate errors
367 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
368 struct idx_info *ii, unsigned int npages,
369 struct page **pages, bool *need_swab)
371 struct ptlrpc_request *req;
372 struct idx_info *req_ii;
373 struct ptlrpc_bulk_desc *desc;
379 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
383 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
385 ptlrpc_request_free(req);
389 req->rq_request_portal = MDS_READPAGE_PORTAL;
390 ptlrpc_at_set_req_timeout(req);
392 /* allocate bulk descriptor */
393 desc = ptlrpc_prep_bulk_imp(req, npages, 1,
394 PTLRPC_BULK_PUT_SINK,
396 &ptlrpc_bulk_kiov_pin_ops);
398 GOTO(out, rc = -ENOMEM);
400 /* req now owns desc and will free it when it gets freed */
401 for (i = 0; i < npages; i++)
402 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
405 /* pack index information in request */
406 req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
409 ptlrpc_request_set_replen(req);
411 /* send request to master and wait for RPC to complete */
412 rc = ptlrpc_queue_wait(req);
416 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
417 req->rq_bulk->bd_nob_transferred);
421 /* sptlrpc_cli_unwrap_bulk_read() returns the number of bytes
425 req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
428 *need_swab = req_capsule_rep_need_swab(&req->rq_pill);
432 ptlrpc_req_finished(req);