Whamcloud - gitweb
LU-2446 build: Update Whamcloud copyright messages for Intel
[fs/lustre-release.git] / lustre / quota / qsd_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include <lustre_net.h>
38 #include <lustre_import.h>
39 #include <lustre_dlm.h>
40 #include <obd_class.h>
41
42 #include "qsd_internal.h"
43
44 struct qsd_async_args {
45         struct obd_export     *aa_exp;
46         struct qsd_qtype_info *aa_qqi;
47         void                  *aa_arg;
48         struct lquota_lvb     *aa_lvb;
49         struct lustre_handle   aa_lockh;
50         qsd_req_completion_t   aa_completion;
51 };
52
53 /*
54  * non-intent quota request interpret callback.
55  *
56  * \param env    - the environment passed by the caller
57  * \param req    - the non-intent quota request
58  * \param arg    - qsd_async_args
59  * \param rc     - request status
60  *
61  * \retval 0     - success
62  * \retval -ve   - appropriate errors
63  */
64 static int qsd_dqacq_interpret(const struct lu_env *env,
65                                struct ptlrpc_request *req, void *arg, int rc)
66 {
67         struct quota_body     *rep_qbody = NULL, *req_qbody;
68         struct qsd_async_args *aa = (struct qsd_async_args *)arg;
69         ENTRY;
70
71         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
72         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
73                 rep_qbody = req_capsule_server_get(&req->rq_pill,
74                                                    &RMF_QUOTA_BODY);
75         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
76                           NULL, aa->aa_arg, rc);
77         RETURN(rc);
78 }
79
80 /*
81  * Send non-intent quota request to master.
82  *
83  * \param env    - the environment passed by the caller
84  * \param exp    - is the export to use to send the acquire RPC
85  * \param qbody  - quota body to be packed in request
86  * \param sync   - synchronous or asynchronous
87  * \param completion - completion callback
88  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
89  *                 function
90  * \param lqe    - is the qid entry to be processed
91  *
92  * \retval 0     - success
93  * \retval -ve   - appropriate errors
94  */
95 int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
96                    struct quota_body *qbody, bool sync,
97                    qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
98                    struct lustre_handle *lockh, struct lquota_entry *lqe)
99 {
100         struct ptlrpc_request   *req;
101         struct quota_body       *req_qbody;
102         struct qsd_async_args   *aa;
103         int                      rc;
104         ENTRY;
105
106         LASSERT(exp);
107
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
109         if (req == NULL)
110                 GOTO(out, rc = -ENOMEM);
111
112         req->rq_no_resend = req->rq_no_delay = 1;
113         req->rq_no_retry_einprogress = 1;
114         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
115         if (rc) {
116                 ptlrpc_request_free(req);
117                 GOTO(out, rc);
118         }
119
120         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
121         *req_qbody = *qbody;
122
123         ptlrpc_request_set_replen(req);
124
125         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
126         aa = ptlrpc_req_async_args(req);
127         aa->aa_exp = exp;
128         aa->aa_qqi = qqi;
129         aa->aa_arg = (void *)lqe;
130         aa->aa_completion = completion;
131         lustre_handle_copy(&aa->aa_lockh, lockh);
132
133         if (sync) {
134                 rc = ptlrpc_queue_wait(req);
135                 rc = qsd_dqacq_interpret(env, req, aa, rc);
136                 ptlrpc_req_finished(req);
137         } else {
138                 req->rq_interpret_reply = qsd_dqacq_interpret;
139                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
140         }
141
142         RETURN(rc);
143 out:
144         completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
145         return rc;
146 }
147
148 /*
149  * intent quota request interpret callback.
150  *
151  * \param env    - the environment passed by the caller
152  * \param req    - the intent quota request
153  * \param arg    - qsd_async_args
154  * \param rc     - request status
155  *
156  * \retval 0     - success
157  * \retval -ve   - appropriate errors
158  */
159 static int qsd_intent_interpret(const struct lu_env *env,
160                                 struct ptlrpc_request *req, void *arg, int rc)
161 {
162         struct lustre_handle     *lockh;
163         struct quota_body        *rep_qbody = NULL, *req_qbody;
164         struct ldlm_intent       *lit;
165         struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
166         __u64                     flags = LDLM_FL_HAS_INTENT;
167         ENTRY;
168
169         LASSERT(aa->aa_exp);
170         lockh = &aa->aa_lockh;
171         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
172         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
173
174         rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
175                                    &flags, (void *)aa->aa_lvb,
176                                    sizeof(struct lquota_lvb), lockh, rc);
177         if (rc < 0)
178                 /* the lock has been destroyed, forget about the lock handle */
179                 memset(lockh, 0, sizeof(*lockh));
180
181         if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
182                 rep_qbody = req_capsule_server_get(&req->rq_pill,
183                                                    &RMF_QUOTA_BODY);
184
185         aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
186                           aa->aa_lvb, aa->aa_arg, rc);
187         RETURN(rc);
188 }
189
190 /*
191  * Get intent per-ID lock or global-index lock from master.
192  *
193  * \param env    - the environment passed by the caller
194  * \param exp    - is the export to use to send the intent RPC
195  * \param qbody  - quota body to be packed in request
196  * \param sync   - synchronous or asynchronous (pre-acquire)
197  * \param it_op  - IT_QUOTA_DQACQ or IT_QUOTA_CONN
198  * \param completion - completion callback
199  * \param qqi    - is the qsd_qtype_info structure to pass to the completion
200  *                 function
201  * \param lvb    - is the lvb associated with the lock and returned by the
202  *                 server
203  * \param arg    - is an opaq argument passed to the completion callback
204  *
205  * \retval 0     - success
206  * \retval -ve   - appropriate errors
207  */
208 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
209                     struct quota_body *qbody, bool sync, int it_op,
210                     qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
211                     struct lquota_lvb *lvb, void *arg)
212 {
213         struct qsd_thread_info  *qti = qsd_info(env);
214         struct ptlrpc_request   *req;
215         struct qsd_async_args   *aa = NULL;
216         struct ldlm_intent      *lit;
217         struct quota_body       *req_qbody;
218         __u64                    flags = LDLM_FL_HAS_INTENT;
219         int                      rc;
220         ENTRY;
221
222         LASSERT(exp != NULL);
223         LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
224
225         memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
226
227         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
228                                    &RQF_LDLM_INTENT_QUOTA);
229         if (req == NULL)
230                 GOTO(out, rc = -ENOMEM);
231
232         req->rq_no_retry_einprogress = 1;
233         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
234         if (rc) {
235                 ptlrpc_request_free(req);
236                 GOTO(out, rc);
237         }
238
239         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
240         lit->opc = (__u64)it_op;
241
242         req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
243         *req_qbody = *qbody;
244
245         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
246                              sizeof(*lvb));
247         ptlrpc_request_set_replen(req);
248
249         switch(it_op) {
250         case IT_QUOTA_CONN:
251                 /* build resource name associated with global index */
252                 fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
253
254                 /* copy einfo template and fill ei_cbdata with qqi pointer */
255                 memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
256                 qti->qti_einfo.ei_cbdata = qqi;
257
258                 /* don't cancel global lock on memory pressure */
259                 flags |= LDLM_FL_NO_LRU;
260                 break;
261         case IT_QUOTA_DQACQ:
262                 /* build resource name associated for per-ID quota lock */
263                 fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
264                                       &qti->qti_resid);
265
266                 /* copy einfo template and fill ei_cbdata with lqe pointer */
267                 memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
268                 qti->qti_einfo.ei_cbdata = arg;
269                 break;
270         default:
271                 LASSERTF(0, "invalid it_op %d", it_op);
272         }
273
274         /* build lock enqueue request */
275         rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
276                               &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA,
277                               &qti->qti_lockh, 1);
278         if (rc < 0) {
279                 ptlrpc_req_finished(req);
280                 GOTO(out, rc);
281         }
282
283         /* grab reference on backend structure for the new lock */
284         switch(it_op) {
285         case IT_QUOTA_CONN:
286                 /* grab reference on qqi for new lock */
287 #ifdef USE_LU_REF
288                 struct ldlm_lock        *lock;
289
290                 lock = ldlm_handle2lock(&qti->qti_lockh);
291                 if (lock == NULL) {
292                         ptlrpc_req_finished(req);
293                         GOTO(out, -ENOLCK);
294                 }
295                 lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
296                 LDLM_LOCK_PUT(lock);
297 #endif
298                 qqi_getref(qqi);
299                 break;
300         case IT_QUOTA_DQACQ:
301                 /* grab reference on lqe for new lock */
302                 lqe_getref((struct lquota_entry *)arg);
303                 /* all acquire/release request are sent with no_resend and
304                  * no_delay flag */
305                 req->rq_no_resend = req->rq_no_delay = 1;
306                 break;
307         default:
308                 break;
309         }
310
311         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
312         aa = ptlrpc_req_async_args(req);
313         aa->aa_exp = exp;
314         aa->aa_qqi = qqi;
315         aa->aa_arg = arg;
316         aa->aa_lvb = lvb;
317         aa->aa_completion = completion;
318         lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
319
320         if (sync) {
321                 /* send lock enqueue request and wait for completion */
322                 rc = ptlrpc_queue_wait(req);
323                 rc = qsd_intent_interpret(env, req, aa, rc);
324                 ptlrpc_req_finished(req);
325         } else {
326                 /* queue lock request and return */
327                 req->rq_interpret_reply = qsd_intent_interpret;
328                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
329         }
330
331         RETURN(rc);
332 out:
333         completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
334         return rc;
335 }
336
337 /*
338  * Fetch a global or slave index from the QMT.
339  *
340  * \param env    - the environment passed by the caller
341  * \param exp    - is the export to use to issue the OBD_IDX_READ RPC
342  * \param ii     - is the index information to be packed in the request
343  *                 on success, the index information returned by the server
344  *                 is copied there.
345  * \param npages - is the number of pages in the pages array
346  * \param pages  - is an array of @npages pages
347  *
348  * \retval 0     - success
349  * \retval -ve   - appropriate errors
350  */
351 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
352                     struct idx_info *ii, unsigned int npages,
353                     cfs_page_t **pages, bool *need_swab)
354 {
355         struct ptlrpc_request   *req;
356         struct idx_info         *req_ii;
357         struct ptlrpc_bulk_desc *desc;
358         int                      rc, i;
359         ENTRY;
360
361         LASSERT(exp);
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
364         if (req == NULL)
365                 RETURN(-ENOMEM);
366
367         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
368         if (rc) {
369                 ptlrpc_request_free(req);
370                 RETURN(rc);
371         }
372
373         req->rq_request_portal = MDS_READPAGE_PORTAL;
374         ptlrpc_at_set_req_timeout(req);
375
376         /* allocate bulk descriptor */
377         desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
378                                     MDS_BULK_PORTAL);
379         if (desc == NULL) {
380                 ptlrpc_request_free(req);
381                 RETURN(-ENOMEM);
382         }
383
384         /* req now owns desc and will free it when it gets freed */
385         for (i = 0; i < npages; i++)
386                 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, CFS_PAGE_SIZE);
387
388         /* pack index information in request */
389         req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
390         *req_ii = *ii;
391
392         ptlrpc_request_set_replen(req);
393
394         /* send request to master and wait for RPC to complete */
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out, rc);
398
399         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
400                                           req->rq_bulk->bd_nob_transferred);
401         if (rc < 0)
402                 GOTO(out, rc);
403         rc = 0;
404
405         req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
406         *ii = *req_ii;
407
408         *need_swab = ptlrpc_rep_need_swab(req);
409
410         EXIT;
411 out:
412         ptlrpc_req_finished(req);
413         return rc;
414 }