Whamcloud - gitweb
LU-9286 ptlrpc: fix wrong error handlers
[fs/lustre-release.git] / lustre / quota / qsd_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2016, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_dlm.h>
36 #include <obd_class.h>
37
38 #include "qsd_internal.h"
39
40 struct qsd_async_args {
41         struct obd_export     *aa_exp;
42         struct qsd_qtype_info *aa_qqi;
43         void                  *aa_arg;
44         struct lquota_lvb     *aa_lvb;
45         struct lustre_handle   aa_lockh;
46         qsd_req_completion_t   aa_completion;
47 };
48
49 /*
50  * non-intent quota request interpret callback.
51  *
52  * \param env    - the environment passed by the caller
53  * \param req    - the non-intent quota request
54  * \param arg    - qsd_async_args
55  * \param rc     - request status
56  *
57  * \retval 0     - success
58  * \retval -ve   - appropriate errors
59  */
60 static int qsd_dqacq_interpret(const struct lu_env *env,
61                                struct ptlrpc_request *req, void *arg, int rc)
62 {
63         struct quota_body     *rep_qbody = NULL, *req_qbody;
64         struct qsd_async_args *aa = (struct qsd_async_args *)arg;
65         ENTRY;
66
67         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
68         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
69                 rep_qbody = req_capsule_server_get(&req->rq_pill,
70                                                    &RMF_QUOTA_BODY);
71         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
72                           NULL, aa->aa_arg, rc);
73         RETURN(rc);
74 }
75
76 /*
77  * Send non-intent quota request to master.
78  *
79  * \param env    - the environment passed by the caller
80  * \param exp    - is the export to use to send the acquire RPC
81  * \param qbody  - quota body to be packed in request
82  * \param sync   - synchronous or asynchronous
83  * \param completion - completion callback
84  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
85  *                 function
86  * \param lqe    - is the qid entry to be processed
87  *
88  * \retval 0     - success
89  * \retval -ve   - appropriate errors
90  */
91 int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
92                    struct quota_body *qbody, bool sync,
93                    qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
94                    struct lustre_handle *lockh, struct lquota_entry *lqe)
95 {
96         struct ptlrpc_request   *req;
97         struct quota_body       *req_qbody;
98         struct qsd_async_args   *aa;
99         int                      rc;
100         ENTRY;
101
102         LASSERT(exp);
103
104         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
105         if (req == NULL)
106                 GOTO(out, rc = -ENOMEM);
107
108         req->rq_no_resend = req->rq_no_delay = 1;
109         req->rq_no_retry_einprogress = 1;
110         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
111         if (rc) {
112                 ptlrpc_request_free(req);
113                 GOTO(out, rc);
114         }
115
116         req->rq_request_portal = MDS_READPAGE_PORTAL;
117         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
118         *req_qbody = *qbody;
119
120         ptlrpc_request_set_replen(req);
121
122         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
123         aa = ptlrpc_req_async_args(req);
124         aa->aa_exp = exp;
125         aa->aa_qqi = qqi;
126         aa->aa_arg = (void *)lqe;
127         aa->aa_completion = completion;
128         lustre_handle_copy(&aa->aa_lockh, lockh);
129
130         if (sync) {
131                 rc = ptlrpc_queue_wait(req);
132                 rc = qsd_dqacq_interpret(env, req, aa, rc);
133                 ptlrpc_req_finished(req);
134         } else {
135                 req->rq_interpret_reply = qsd_dqacq_interpret;
136                 ptlrpcd_add_req(req);
137         }
138
139         RETURN(rc);
140 out:
141         completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
142         return rc;
143 }
144
145 /*
146  * intent quota request interpret callback.
147  *
148  * \param env    - the environment passed by the caller
149  * \param req    - the intent quota request
150  * \param arg    - qsd_async_args
151  * \param rc     - request status
152  *
153  * \retval 0     - success
154  * \retval -ve   - appropriate errors
155  */
156 static int qsd_intent_interpret(const struct lu_env *env,
157                                 struct ptlrpc_request *req, void *arg, int rc)
158 {
159         struct lustre_handle     *lockh;
160         struct quota_body        *rep_qbody = NULL, *req_qbody;
161         struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
162         struct ldlm_reply        *lockrep;
163         __u64                     flags = LDLM_FL_HAS_INTENT;
164         ENTRY;
165
166         LASSERT(aa->aa_exp);
167         lockh = &aa->aa_lockh;
168         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
169         req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
170
171         rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
172                                    &flags, (void *)aa->aa_lvb,
173                                    sizeof(struct lquota_lvb), lockh, rc);
174         if (rc < 0) {
175                 /* the lock has been destroyed, forget about the lock handle */
176                 memset(lockh, 0, sizeof(*lockh));
177                 /*
178                  * To avoid the server being fullfilled by LDLM locks, server
179                  * may reject the locking request by returning -EINPROGRESS,
180                  * this is different from the -EINPROGRESS returned by quota
181                  * code.
182                  */
183                 if (rc == -EINPROGRESS)
184                         rc = -EAGAIN;
185                 GOTO(out, rc);
186         }
187
188         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
189         LASSERT(lockrep != NULL);
190         rc = ptlrpc_status_ntoh(lockrep->lock_policy_res2);
191
192         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
193                 rep_qbody = req_capsule_server_get(&req->rq_pill,
194                                                    &RMF_QUOTA_BODY);
195 out:
196         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
197                           aa->aa_lvb, aa->aa_arg, rc);
198         RETURN(rc);
199 }
200
201 /*
202  * Get intent per-ID lock or global-index lock from master.
203  *
204  * \param env    - the environment passed by the caller
205  * \param exp    - is the export to use to send the intent RPC
206  * \param qbody  - quota body to be packed in request
207  * \param sync   - synchronous or asynchronous (pre-acquire)
208  * \param it_op  - IT_QUOTA_DQACQ or IT_QUOTA_CONN
209  * \param completion - completion callback
210  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
211  *                 function
212  * \param lvb    - is the lvb associated with the lock and returned by the
213  *                 server
214  * \param arg    - is an opaq argument passed to the completion callback
215  *
216  * \retval 0     - success
217  * \retval -ve   - appropriate errors
218  */
219 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
220                     struct quota_body *qbody, bool sync, int it_op,
221                     qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
222                     struct lquota_lvb *lvb, void *arg)
223 {
224         struct qsd_thread_info  *qti = qsd_info(env);
225         struct ptlrpc_request   *req;
226         struct qsd_async_args   *aa = NULL;
227         struct ldlm_intent      *lit;
228         struct quota_body       *req_qbody;
229         __u64                    flags = LDLM_FL_HAS_INTENT;
230         int                      rc;
231         ENTRY;
232
233         LASSERT(exp != NULL);
234         LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
235
236         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
237
238         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
239                                    &RQF_LDLM_INTENT_QUOTA);
240         if (req == NULL)
241                 GOTO(out, rc = -ENOMEM);
242
243         req->rq_no_retry_einprogress = 1;
244         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 GOTO(out, rc);
248         }
249         req->rq_request_portal = MDS_READPAGE_PORTAL;
250
251         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
252         lit->opc = (__u64)it_op;
253
254         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
255         *req_qbody = *qbody;
256
257         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
258                              sizeof(*lvb));
259         ptlrpc_request_set_replen(req);
260
261         switch(it_op) {
262         case IT_QUOTA_CONN:
263                 /* build resource name associated with global index */
264                 fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
265
266                 /* copy einfo template and fill ei_cbdata with qqi pointer */
267                 memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
268                 qti->qti_einfo.ei_cbdata = qqi;
269
270                 /* don't cancel global lock on memory pressure */
271                 flags |= LDLM_FL_NO_LRU;
272                 break;
273         case IT_QUOTA_DQACQ:
274                 /* build resource name associated for per-ID quota lock */
275                 fid_build_quota_res_name(&qbody->qb_fid, &qbody->qb_id,
276                                          &qti->qti_resid);
277
278                 /* copy einfo template and fill ei_cbdata with lqe pointer */
279                 memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
280                 qti->qti_einfo.ei_cbdata = arg;
281                 break;
282         default:
283                 LASSERTF(0, "invalid it_op %d\n", it_op);
284         }
285
286         /* build lock enqueue request */
287         rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
288                               &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA,
289                               &qti->qti_lockh, 1);
290         if (rc < 0) {
291                 ptlrpc_req_finished(req);
292                 GOTO(out, rc);
293         }
294
295         /* grab reference on backend structure for the new lock */
296         switch(it_op) {
297         case IT_QUOTA_CONN:
298                 /* grab reference on qqi for new lock */
299 #ifdef USE_LU_REF
300         {
301                 struct ldlm_lock        *lock;
302
303                 lock = ldlm_handle2lock(&qti->qti_lockh);
304                 if (lock == NULL) {
305                         ptlrpc_req_finished(req);
306                         GOTO(out, rc = -ENOLCK);
307                 }
308                 lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
309                 LDLM_LOCK_PUT(lock);
310         }
311 #endif
312                 qqi_getref(qqi);
313                 break;
314         case IT_QUOTA_DQACQ:
315                 /* grab reference on lqe for new lock */
316                 lqe_getref((struct lquota_entry *)arg);
317                 /* all acquire/release request are sent with no_resend and
318                  * no_delay flag */
319                 req->rq_no_resend = req->rq_no_delay = 1;
320                 break;
321         default:
322                 break;
323         }
324
325         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
326         aa = ptlrpc_req_async_args(req);
327         aa->aa_exp = exp;
328         aa->aa_qqi = qqi;
329         aa->aa_arg = arg;
330         aa->aa_lvb = lvb;
331         aa->aa_completion = completion;
332         lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
333
334         if (sync) {
335                 /* send lock enqueue request and wait for completion */
336                 rc = ptlrpc_queue_wait(req);
337                 rc = qsd_intent_interpret(env, req, aa, rc);
338                 ptlrpc_req_finished(req);
339         } else {
340                 /* queue lock request and return */
341                 req->rq_interpret_reply = qsd_intent_interpret;
342                 ptlrpcd_add_req(req);
343         }
344
345         RETURN(rc);
346 out:
347         completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
348         return rc;
349 }
350
351 /*
352  * Fetch a global or slave index from the QMT.
353  *
354  * \param env    - the environment passed by the caller
355  * \param exp    - is the export to use to issue the OBD_IDX_READ RPC
356  * \param ii     - is the index information to be packed in the request
357  *                 on success, the index information returned by the server
358  *                 is copied there.
359  * \param npages - is the number of pages in the pages array
360  * \param pages  - is an array of @npages pages
361  *
362  * \retval 0     - success
363  * \retval -ve   - appropriate errors
364  */
365 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
366                     struct idx_info *ii, unsigned int npages,
367                     struct page **pages, bool *need_swab)
368 {
369         struct ptlrpc_request   *req;
370         struct idx_info         *req_ii;
371         struct ptlrpc_bulk_desc *desc;
372         int                      rc, i;
373         ENTRY;
374
375         LASSERT(exp);
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         req->rq_request_portal = MDS_READPAGE_PORTAL;
388         ptlrpc_at_set_req_timeout(req);
389
390         /* allocate bulk descriptor */
391         desc = ptlrpc_prep_bulk_imp(req, npages, 1,
392                                     PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
393                                     MDS_BULK_PORTAL,
394                                     &ptlrpc_bulk_kiov_pin_ops);
395         if (desc == NULL)
396                 GOTO(out, rc = -ENOMEM);
397
398         /* req now owns desc and will free it when it gets freed */
399         for (i = 0; i < npages; i++)
400                 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
401                                                  PAGE_SIZE);
402
403         /* pack index information in request */
404         req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
405         *req_ii = *ii;
406
407         ptlrpc_request_set_replen(req);
408
409         /* send request to master and wait for RPC to complete */
410         rc = ptlrpc_queue_wait(req);
411         if (rc)
412                 GOTO(out, rc);
413
414         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
415                                           req->rq_bulk->bd_nob_transferred);
416         if (rc < 0)
417                 GOTO(out, rc);
418         else
419                 /* sptlrpc_cli_unwrap_bulk_read() returns the number of bytes
420                  * transferred*/
421                 rc = 0;
422
423         req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
424         *ii = *req_ii;
425
426         *need_swab = ptlrpc_rep_need_swab(req);
427
428         EXIT;
429 out:
430         ptlrpc_req_finished(req);
431         return rc;
432 }