Whamcloud - gitweb
LU-2211 quota: cap how long a thread can wait for quota
[fs/lustre-release.git] / lustre / quota / qsd_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012 Intel, Inc.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_net.h>
38 #include <lustre_import.h>
39 #include <lustre_dlm.h>
40 #include <obd_class.h>
41
42 #include "qsd_internal.h"
43
44 struct qsd_async_args {
45         struct obd_export     *aa_exp;
46         struct qsd_qtype_info *aa_qqi;
47         void                  *aa_arg;
48         union ldlm_wire_lvb   *aa_lvb;
49         struct lustre_handle   aa_lockh;
50         qsd_req_completion_t   aa_completion;
51 };
52
53 /*
54  * non-intent quota request interpret callback.
55  *
56  * \param env    - the environment passed by the caller
57  * \param req    - the non-intent quota request
58  * \param arg    - qsd_async_args
59  * \param rc     - request status
60  *
61  * \retval 0     - success
62  * \retval -ve   - appropriate errors
63  */
64 static int qsd_dqacq_interpret(const struct lu_env *env,
65                                struct ptlrpc_request *req, void *arg, int rc)
66 {
67         struct quota_body     *rep_qbody = NULL, *req_qbody;
68         struct qsd_async_args *aa = (struct qsd_async_args *)arg;
69         ENTRY;
70
71         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
72         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
73                 rep_qbody = req_capsule_server_get(&req->rq_pill,
74                                                    &RMF_QUOTA_BODY);
75         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
76                           NULL, aa->aa_arg, rc);
77         RETURN(rc);
78 }
79
80 /*
81  * Send non-intent quota request to master.
82  *
83  * \param env    - the environment passed by the caller
84  * \param exp    - is the export to use to send the acquire RPC
85  * \param qbody  - quota body to be packed in request
86  * \param sync   - synchronous or asynchronous
87  * \param completion - completion callback
88  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
89  *                 function
90  * \param lqe    - is the qid entry to be processed
91  *
92  * \retval 0     - success
93  * \retval -ve   - appropriate errors
94  */
95 int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
96                    struct quota_body *qbody, bool sync,
97                    qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
98                    struct lustre_handle *lockh, struct lquota_entry *lqe)
99 {
100         struct ptlrpc_request   *req;
101         struct quota_body       *req_qbody;
102         struct qsd_async_args   *aa;
103         int                      rc;
104         ENTRY;
105
106         LASSERT(exp);
107
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
109         if (req == NULL)
110                 GOTO(out, rc = -ENOMEM);
111
112         req->rq_no_resend = req->rq_no_delay = 1;
113         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
114         if (rc) {
115                 ptlrpc_request_free(req);
116                 GOTO(out, rc);
117         }
118
119         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
120         *req_qbody = *qbody;
121
122         ptlrpc_request_set_replen(req);
123
124         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
125         aa = ptlrpc_req_async_args(req);
126         aa->aa_exp = exp;
127         aa->aa_qqi = qqi;
128         aa->aa_arg = (void *)lqe;
129         aa->aa_completion = completion;
130         lustre_handle_copy(&aa->aa_lockh, lockh);
131
132         if (sync) {
133                 rc = ptlrpc_queue_wait(req);
134                 rc = qsd_dqacq_interpret(env, req, aa, rc);
135                 ptlrpc_req_finished(req);
136         } else {
137                 req->rq_interpret_reply = qsd_dqacq_interpret;
138                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
139         }
140
141         RETURN(rc);
142 out:
143         completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
144         return rc;
145 }
146
147 /*
148  * intent quota request interpret callback.
149  *
150  * \param env    - the environment passed by the caller
151  * \param req    - the intent quota request
152  * \param arg    - qsd_async_args
153  * \param rc     - request status
154  *
155  * \retval 0     - success
156  * \retval -ve   - appropriate errors
157  */
158 static int qsd_intent_interpret(const struct lu_env *env,
159                                 struct ptlrpc_request *req, void *arg, int rc)
160 {
161         struct lustre_handle     *lockh;
162         struct quota_body        *rep_qbody = NULL, *req_qbody;
163         struct ldlm_intent       *lit;
164         struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
165         int                       flags = LDLM_FL_HAS_INTENT;
166         ENTRY;
167
168         LASSERT(aa->aa_exp);
169         lockh = &aa->aa_lockh;
170         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
171         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
172
173         rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
174                                    &flags, (void *)aa->aa_lvb,
175                                    sizeof(union ldlm_wire_lvb), lockh, rc);
176         if (rc < 0)
177                 /* the lock has been destroyed, forget about the lock handle */
178                 memset(lockh, 0, sizeof(*lockh));
179
180         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
181                 rep_qbody = req_capsule_server_get(&req->rq_pill,
182                                                    &RMF_QUOTA_BODY);
183
184         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
185                           aa->aa_lvb, aa->aa_arg, rc);
186         RETURN(rc);
187 }
188
189 /*
190  * Get intent per-ID lock or global-index lock from master.
191  *
192  * \param env    - the environment passed by the caller
193  * \param exp    - is the export to use to send the intent RPC
194  * \param qbody  - quota body to be packed in request
195  * \param sync   - synchronous or asynchronous (pre-acquire)
196  * \param it_op  - IT_QUOTA_DQACQ or IT_QUOTA_CONN
197  * \param completion - completion callback
198  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
199  *                 function
200  * \param lvb    - is the lvb associated with the lock and returned by the
201  *                 server
202  * \param arg    - is an opaq argument passed to the completion callback
203  *
204  * \retval 0     - success
205  * \retval -ve   - appropriate errors
206  */
207 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
208                     struct quota_body *qbody, bool sync, int it_op,
209                     qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
210                     union ldlm_wire_lvb *lvb, void *arg)
211 {
212         struct qsd_thread_info  *qti = qsd_info(env);
213         struct ptlrpc_request   *req;
214         struct qsd_async_args   *aa = NULL;
215         struct ldlm_intent      *lit;
216         struct quota_body       *req_qbody;
217         int                      rc, flags = LDLM_FL_HAS_INTENT;
218         ENTRY;
219
220         LASSERT(exp != NULL);
221         LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
222
223         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
224
225         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
226                                    &RQF_LDLM_INTENT_QUOTA);
227         if (req == NULL)
228                 GOTO(out, rc = -ENOMEM);
229
230         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
231         if (rc) {
232                 ptlrpc_request_free(req);
233                 GOTO(out, rc);
234         }
235
236         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
237         lit->opc = (__u64)it_op;
238
239         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
240         *req_qbody = *qbody;
241
242         ptlrpc_request_set_replen(req);
243
244         switch(it_op) {
245         case IT_QUOTA_CONN:
246                 /* build resource name associated with global index */
247                 fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
248
249                 /* copy einfo template and fill ei_cbdata with qqi pointer */
250                 memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
251                 qti->qti_einfo.ei_cbdata = qqi;
252
253                 /* don't cancel global lock on memory pressure */
254                 flags |= LDLM_FL_NO_LRU;
255                 break;
256         case IT_QUOTA_DQACQ:
257                 /* build resource name associated for per-ID quota lock */
258                 fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
259                                       &qti->qti_resid);
260
261                 /* copy einfo template and fill ei_cbdata with lqe pointer */
262                 memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
263                 qti->qti_einfo.ei_cbdata = arg;
264                 break;
265         default:
266                 LASSERTF(0, "invalid it_op %d", it_op);
267         }
268
269         /* build lock enqueue request */
270         rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
271                               &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
272                               1);
273         if (rc < 0) {
274                 ptlrpc_req_finished(req);
275                 GOTO(out, rc);
276         }
277
278         /* grab reference on backend structure for the new lock */
279         switch(it_op) {
280         case IT_QUOTA_CONN:
281                 /* grab reference on qqi for new lock */
282 #ifdef USE_LU_REF
283                 struct ldlm_lock        *lock;
284
285                 lock = ldlm_handle2lock(&qti->qti_lockh);
286                 if (lock == NULL) {
287                         ptlrpc_req_finished(req);
288                         GOTO(out, -ENOLCK);
289                 }
290                 lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
291                 LDLM_LOCK_PUT(lock);
292 #endif
293                 qqi_getref(qqi);
294                 break;
295         case IT_QUOTA_DQACQ:
296                 /* grab reference on lqe for new lock */
297                 lqe_getref((struct lquota_entry *)arg);
298                 /* all acquire/release request are sent with no_resend and
299                  * no_delay flag */
300                 req->rq_no_resend = req->rq_no_delay = 1;
301                 break;
302         default:
303                 break;
304         }
305
306         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
307         aa = ptlrpc_req_async_args(req);
308         aa->aa_exp = exp;
309         aa->aa_qqi = qqi;
310         aa->aa_arg = arg;
311         aa->aa_lvb = lvb;
312         aa->aa_completion = completion;
313         lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
314
315         if (sync) {
316                 /* send lock enqueue request and wait for completion */
317                 rc = ptlrpc_queue_wait(req);
318                 rc = qsd_intent_interpret(env, req, aa, rc);
319                 ptlrpc_req_finished(req);
320         } else {
321                 /* queue lock request and return */
322                 req->rq_interpret_reply = qsd_intent_interpret;
323                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
324         }
325
326         RETURN(rc);
327 out:
328         completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
329         return rc;
330 }
331
332 /*
333  * Fetch a global or slave index from the QMT.
334  *
335  * \param env    - the environment passed by the caller
336  * \param exp    - is the export to use to issue the OBD_IDX_READ RPC
337  * \param ii     - is the index information to be packed in the request
338  *                 on success, the index information returned by the server
339  *                 is copied there.
340  * \param npages - is the number of pages in the pages array
341  * \param pages  - is an array of @npages pages
342  *
343  * \retval 0     - success
344  * \retval -ve   - appropriate errors
345  */
346 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
347                     struct idx_info *ii, unsigned int npages,
348                     cfs_page_t **pages, bool *need_swab)
349 {
350         struct ptlrpc_request   *req;
351         struct idx_info         *req_ii;
352         struct ptlrpc_bulk_desc *desc;
353         int                      rc, i;
354         ENTRY;
355
356         LASSERT(exp);
357
358         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
359         if (req == NULL)
360                 RETURN(-ENOMEM);
361
362         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
363         if (rc) {
364                 ptlrpc_request_free(req);
365                 RETURN(rc);
366         }
367
368         req->rq_request_portal = MDS_READPAGE_PORTAL;
369         ptlrpc_at_set_req_timeout(req);
370
371         /* allocate bulk descriptor */
372         desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
373                                     MDS_BULK_PORTAL);
374         if (desc == NULL) {
375                 ptlrpc_request_free(req);
376                 RETURN(-ENOMEM);
377         }
378
379         /* req now owns desc and will free it when it gets freed */
380         for (i = 0; i < npages; i++)
381                 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, CFS_PAGE_SIZE);
382
383         /* pack index information in request */
384         req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
385         *req_ii = *ii;
386
387         ptlrpc_request_set_replen(req);
388
389         /* send request to master and wait for RPC to complete */
390         rc = ptlrpc_queue_wait(req);
391         if (rc)
392                 GOTO(out, rc);
393
394         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
395                                           req->rq_bulk->bd_nob_transferred);
396         if (rc < 0)
397                 GOTO(out, rc);
398         rc = 0;
399
400         req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
401         *ii = *req_ii;
402
403         *need_swab = ptlrpc_rep_need_swab(req);
404
405         EXIT;
406 out:
407         ptlrpc_req_finished(req);
408         return rc;
409 }