Whamcloud - gitweb
LU-15189 osc: don't have extra nvidia call
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
49
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(osc_fallocate_base);
483
484 static int osc_sync_interpret(const struct lu_env *env,
485                               struct ptlrpc_request *req, void *args, int rc)
486 {
487         struct osc_fsync_args *fa = args;
488         struct ost_body *body;
489         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490         unsigned long valid = 0;
491         struct cl_object *obj;
492         ENTRY;
493
494         if (rc != 0)
495                 GOTO(out, rc);
496
497         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498         if (body == NULL) {
499                 CERROR("can't unpack ost_body\n");
500                 GOTO(out, rc = -EPROTO);
501         }
502
503         *fa->fa_oa = body->oa;
504         obj = osc2cl(fa->fa_obj);
505
506         /* Update osc object's blocks attribute */
507         cl_object_attr_lock(obj);
508         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509                 attr->cat_blocks = body->oa.o_blocks;
510                 valid |= CAT_BLOCKS;
511         }
512
513         if (valid != 0)
514                 cl_object_attr_update(env, obj, attr, valid);
515         cl_object_attr_unlock(obj);
516
517 out:
518         rc = fa->fa_upcall(fa->fa_cookie, rc);
519         RETURN(rc);
520 }
521
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523                   obd_enqueue_update_f upcall, void *cookie,
524                   struct ptlrpc_request_set *rqset)
525 {
526         struct obd_export     *exp = osc_export(obj);
527         struct ptlrpc_request *req;
528         struct ost_body       *body;
529         struct osc_fsync_args *fa;
530         int                    rc;
531         ENTRY;
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549         req->rq_interpret_reply = osc_sync_interpret;
550
551         fa = ptlrpc_req_async_args(fa, req);
552         fa->fa_obj = obj;
553         fa->fa_oa = oa;
554         fa->fa_upcall = upcall;
555         fa->fa_cookie = cookie;
556
557         ptlrpc_set_add_req(rqset, req);
558
559         RETURN (0);
560 }
561
562 /* Find and cancel locally locks matched by @mode in the resource found by
563  * @objid. Found locks are added into @cancel list. Returns the amount of
564  * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566                                    struct list_head *cancels,
567                                    enum ldlm_mode mode, __u64 lock_flags)
568 {
569         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570         struct ldlm_res_id res_id;
571         struct ldlm_resource *res;
572         int count;
573         ENTRY;
574
575         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576          * export) but disabled through procfs (flag in NS).
577          *
578          * This distinguishes from a case when ELC is not supported originally,
579          * when we still want to cancel locks in advance and just cancel them
580          * locally, without sending any RPC. */
581         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
582                 RETURN(0);
583
584         ostid_build_res_name(&oa->o_oi, &res_id);
585         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
586         if (IS_ERR(res))
587                 RETURN(0);
588
589         LDLM_RESOURCE_ADDREF(res);
590         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591                                            lock_flags, 0, NULL);
592         LDLM_RESOURCE_DELREF(res);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(const struct lu_env *env,
598                                  struct ptlrpc_request *req, void *args, int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         wake_up(&cli->cl_destroy_waitq);
604
605         return 0;
606 }
607
608 static int osc_can_send_destroy(struct client_obd *cli)
609 {
610         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611             cli->cl_max_rpcs_in_flight) {
612                 /* The destroy request can be sent */
613                 return 1;
614         }
615         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616             cli->cl_max_rpcs_in_flight) {
617                 /*
618                  * The counter has been modified between the two atomic
619                  * operations.
620                  */
621                 wake_up(&cli->cl_destroy_waitq);
622         }
623         return 0;
624 }
625
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
627                        struct obdo *oa)
628 {
629         struct client_obd     *cli = &exp->exp_obd->u.cli;
630         struct ptlrpc_request *req;
631         struct ost_body       *body;
632         LIST_HEAD(cancels);
633         int rc, count;
634         ENTRY;
635
636         if (!oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642                                         LDLM_FL_DISCARD_DATA);
643
644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645         if (req == NULL) {
646                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
647                 RETURN(-ENOMEM);
648         }
649
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661         LASSERT(body);
662         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663
664         ptlrpc_request_set_replen(req);
665
666         req->rq_interpret_reply = osc_destroy_interpret;
667         if (!osc_can_send_destroy(cli)) {
668                 /*
669                  * Wait until the number of on-going destroy RPCs drops
670                  * under max_rpc_in_flight
671                  */
672                 rc = l_wait_event_abortable_exclusive(
673                         cli->cl_destroy_waitq,
674                         osc_can_send_destroy(cli));
675                 if (rc) {
676                         ptlrpc_req_finished(req);
677                         RETURN(-EINTR);
678                 }
679         }
680
681         /* Do not wait for response */
682         ptlrpcd_add_req(req);
683         RETURN(0);
684 }
685
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
687                                 long writing_bytes)
688 {
689         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690
691         LASSERT(!(oa->o_valid & bits));
692
693         oa->o_valid |= bits;
694         spin_lock(&cli->cl_loi_list_lock);
695         if (cli->cl_ocd_grant_param)
696                 oa->o_dirty = cli->cl_dirty_grant;
697         else
698                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700                 CERROR("dirty %lu > dirty_max %lu\n",
701                        cli->cl_dirty_pages,
702                        cli->cl_dirty_max_pages);
703                 oa->o_undirty = 0;
704         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705                             (long)(obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
711                        obd_max_dirty_pages);
712                 oa->o_undirty = 0;
713         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714                             0x7fffffff)) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
717                 oa->o_undirty = 0;
718         } else {
719                 unsigned long nrpages;
720                 unsigned long undirty;
721
722                 nrpages = cli->cl_max_pages_per_rpc;
723                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725                 undirty = nrpages << PAGE_SHIFT;
726                 if (cli->cl_ocd_grant_param) {
727                         int nrextents;
728
729                         /* take extent tax into account when asking for more
730                          * grant space */
731                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732                                      cli->cl_max_extent_pages;
733                         undirty += nrextents * cli->cl_grant_extent_tax;
734                 }
735                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736                  * to add extent tax, etc.
737                  */
738                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740         }
741         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743         if (cli->cl_lost_grant > INT_MAX) {
744                 CDEBUG(D_CACHE,
745                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746                       cli_name(cli), cli->cl_lost_grant);
747                 oa->o_dropped = INT_MAX;
748         } else {
749                 oa->o_dropped = cli->cl_lost_grant;
750         }
751         cli->cl_lost_grant -= oa->o_dropped;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
756 }
757
758 void osc_update_next_shrink(struct client_obd *cli)
759 {
760         cli->cl_next_shrink_grant = ktime_get_seconds() +
761                                     cli->cl_grant_shrink_interval;
762
763         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764                cli->cl_next_shrink_grant);
765 }
766 EXPORT_SYMBOL(osc_update_next_shrink);
767
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
769 {
770         spin_lock(&cli->cl_loi_list_lock);
771         cli->cl_avail_grant += grant;
772         spin_unlock(&cli->cl_loi_list_lock);
773 }
774
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
776 {
777         if (body->oa.o_valid & OBD_MD_FLGRANT) {
778                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779                 __osc_update_grant(cli, body->oa.o_grant);
780         }
781 }
782
783 /**
784  * grant thread data for shrinking space.
785  */
786 struct grant_thread_data {
787         struct list_head        gtd_clients;
788         struct mutex            gtd_mutex;
789         unsigned long           gtd_stopped:1;
790 };
791 static struct grant_thread_data client_gtd;
792
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794                                       struct ptlrpc_request *req,
795                                       void *args, int rc)
796 {
797         struct osc_grant_args *aa = args;
798         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799         struct ost_body *body;
800
801         if (rc != 0) {
802                 __osc_update_grant(cli, aa->aa_oa->o_grant);
803                 GOTO(out, rc);
804         }
805
806         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
807         LASSERT(body);
808         osc_update_grant(cli, body);
809 out:
810         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
811         aa->aa_oa = NULL;
812
813         return rc;
814 }
815
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
817 {
818         spin_lock(&cli->cl_loi_list_lock);
819         oa->o_grant = cli->cl_avail_grant / 4;
820         cli->cl_avail_grant -= oa->o_grant;
821         spin_unlock(&cli->cl_loi_list_lock);
822         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823                 oa->o_valid |= OBD_MD_FLFLAGS;
824                 oa->o_flags = 0;
825         }
826         oa->o_flags |= OBD_FL_SHRINK_GRANT;
827         osc_update_next_shrink(cli);
828 }
829
830 /* Shrink the current grant, either from some large amount to enough for a
831  * full set of in-flight RPCs, or if we have already shrunk to that limit
832  * then to enough for a single RPC.  This avoids keeping more grant than
833  * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
835 {
836         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
838
839         spin_lock(&cli->cl_loi_list_lock);
840         if (cli->cl_avail_grant <= target_bytes)
841                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842         spin_unlock(&cli->cl_loi_list_lock);
843
844         return osc_shrink_grant_to_target(cli, target_bytes);
845 }
846
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
848 {
849         int                     rc = 0;
850         struct ost_body        *body;
851         ENTRY;
852
853         spin_lock(&cli->cl_loi_list_lock);
854         /* Don't shrink if we are already above or below the desired limit
855          * We don't want to shrink below a single RPC, as that will negatively
856          * impact block allocation and long-term performance. */
857         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860         if (target_bytes >= cli->cl_avail_grant) {
861                 spin_unlock(&cli->cl_loi_list_lock);
862                 RETURN(0);
863         }
864         spin_unlock(&cli->cl_loi_list_lock);
865
866         OBD_ALLOC_PTR(body);
867         if (!body)
868                 RETURN(-ENOMEM);
869
870         osc_announce_cached(cli, &body->oa, 0);
871
872         spin_lock(&cli->cl_loi_list_lock);
873         if (target_bytes >= cli->cl_avail_grant) {
874                 /* available grant has changed since target calculation */
875                 spin_unlock(&cli->cl_loi_list_lock);
876                 GOTO(out_free, rc = 0);
877         }
878         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879         cli->cl_avail_grant = target_bytes;
880         spin_unlock(&cli->cl_loi_list_lock);
881         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882                 body->oa.o_valid |= OBD_MD_FLFLAGS;
883                 body->oa.o_flags = 0;
884         }
885         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887
888         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890                                 sizeof(*body), body, NULL);
891         if (rc != 0)
892                 __osc_update_grant(cli, body->oa.o_grant);
893 out_free:
894         OBD_FREE_PTR(body);
895         RETURN(rc);
896 }
897
898 static int osc_should_shrink_grant(struct client_obd *client)
899 {
900         time64_t next_shrink = client->cl_next_shrink_grant;
901
902         if (client->cl_import == NULL)
903                 return 0;
904
905         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906             client->cl_import->imp_grant_shrink_disabled) {
907                 osc_update_next_shrink(client);
908                 return 0;
909         }
910
911         if (ktime_get_seconds() >= next_shrink - 5) {
912                 /* Get the current RPC size directly, instead of going via:
913                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914                  * Keep comment here so that it can be found by searching. */
915                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
916
917                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918                     client->cl_avail_grant > brw_size)
919                         return 1;
920                 else
921                         osc_update_next_shrink(client);
922         }
923         return 0;
924 }
925
926 #define GRANT_SHRINK_RPC_BATCH  100
927
928 static struct delayed_work work;
929
930 static void osc_grant_work_handler(struct work_struct *data)
931 {
932         struct client_obd *cli;
933         int rpc_sent;
934         bool init_next_shrink = true;
935         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
936
937         rpc_sent = 0;
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_for_each_entry(cli, &client_gtd.gtd_clients,
940                             cl_grant_chain) {
941                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942                     osc_should_shrink_grant(cli)) {
943                         osc_shrink_grant(cli);
944                         rpc_sent++;
945                 }
946
947                 if (!init_next_shrink) {
948                         if (cli->cl_next_shrink_grant < next_shrink &&
949                             cli->cl_next_shrink_grant > ktime_get_seconds())
950                                 next_shrink = cli->cl_next_shrink_grant;
951                 } else {
952                         init_next_shrink = false;
953                         next_shrink = cli->cl_next_shrink_grant;
954                 }
955         }
956         mutex_unlock(&client_gtd.gtd_mutex);
957
958         if (client_gtd.gtd_stopped == 1)
959                 return;
960
961         if (next_shrink > ktime_get_seconds()) {
962                 time64_t delay = next_shrink - ktime_get_seconds();
963
964                 schedule_delayed_work(&work, cfs_time_seconds(delay));
965         } else {
966                 schedule_work(&work.work);
967         }
968 }
969
970 void osc_schedule_grant_work(void)
971 {
972         cancel_delayed_work_sync(&work);
973         schedule_work(&work.work);
974 }
975 EXPORT_SYMBOL(osc_schedule_grant_work);
976
977 /**
978  * Start grant thread for returing grant to server for idle clients.
979  */
980 static int osc_start_grant_work(void)
981 {
982         client_gtd.gtd_stopped = 0;
983         mutex_init(&client_gtd.gtd_mutex);
984         INIT_LIST_HEAD(&client_gtd.gtd_clients);
985
986         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987         schedule_work(&work.work);
988
989         return 0;
990 }
991
992 static void osc_stop_grant_work(void)
993 {
994         client_gtd.gtd_stopped = 1;
995         cancel_delayed_work_sync(&work);
996 }
997
998 static void osc_add_grant_list(struct client_obd *client)
999 {
1000         mutex_lock(&client_gtd.gtd_mutex);
1001         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002         mutex_unlock(&client_gtd.gtd_mutex);
1003 }
1004
1005 static void osc_del_grant_list(struct client_obd *client)
1006 {
1007         if (list_empty(&client->cl_grant_chain))
1008                 return;
1009
1010         mutex_lock(&client_gtd.gtd_mutex);
1011         list_del_init(&client->cl_grant_chain);
1012         mutex_unlock(&client_gtd.gtd_mutex);
1013 }
1014
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1016 {
1017         /*
1018          * ocd_grant is the total grant amount we're expect to hold: if we've
1019          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1021          * dirty.
1022          *
1023          * race is tolerable here: if we're evicted, but imp_state already
1024          * left EVICTED state, then cl_dirty_pages must be 0 already.
1025          */
1026         spin_lock(&cli->cl_loi_list_lock);
1027         cli->cl_avail_grant = ocd->ocd_grant;
1028         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029                 unsigned long consumed = cli->cl_reserved_grant;
1030
1031                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032                         consumed += cli->cl_dirty_grant;
1033                 else
1034                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035                 if (cli->cl_avail_grant < consumed) {
1036                         CERROR("%s: granted %ld but already consumed %ld\n",
1037                                cli_name(cli), cli->cl_avail_grant, consumed);
1038                         cli->cl_avail_grant = 0;
1039                 } else {
1040                         cli->cl_avail_grant -= consumed;
1041                 }
1042         }
1043
1044         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1045                 u64 size;
1046                 int chunk_mask;
1047
1048                 /* overhead for each extent insertion */
1049                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050                 /* determine the appropriate chunk size used by osc_extent. */
1051                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052                                           ocd->ocd_grant_blkbits);
1053                 /* max_pages_per_rpc must be chunk aligned */
1054                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056                                              ~chunk_mask) & chunk_mask;
1057                 /* determine maximum extent size, in #pages */
1058                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060                 cli->cl_ocd_grant_param = 1;
1061         } else {
1062                 cli->cl_ocd_grant_param = 0;
1063                 cli->cl_grant_extent_tax = 0;
1064                 cli->cl_chunkbits = PAGE_SHIFT;
1065                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1066         }
1067         spin_unlock(&cli->cl_loi_list_lock);
1068
1069         CDEBUG(D_CACHE,
1070                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1071                cli_name(cli),
1072                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073                cli->cl_max_extent_pages);
1074
1075         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076                 osc_add_grant_list(cli);
1077 }
1078 EXPORT_SYMBOL(osc_init_grant);
1079
1080 /* We assume that the reason this OSC got a short read is because it read
1081  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083  * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085                               struct brw_page **pga)
1086 {
1087         char *ptr;
1088         int i = 0;
1089
1090         /* skip bytes read OK */
1091         while (nob_read > 0) {
1092                 LASSERT (page_count > 0);
1093
1094                 if (pga[i]->count > nob_read) {
1095                         /* EOF inside this page */
1096                         ptr = kmap(pga[i]->pg) +
1097                                 (pga[i]->off & ~PAGE_MASK);
1098                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1099                         kunmap(pga[i]->pg);
1100                         page_count--;
1101                         i++;
1102                         break;
1103                 }
1104
1105                 nob_read -= pga[i]->count;
1106                 page_count--;
1107                 i++;
1108         }
1109
1110         /* zero remaining pages */
1111         while (page_count-- > 0) {
1112                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1113                 memset(ptr, 0, pga[i]->count);
1114                 kunmap(pga[i]->pg);
1115                 i++;
1116         }
1117 }
1118
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120                            int requested_nob, int niocount,
1121                            size_t page_count, struct brw_page **pga)
1122 {
1123         int     i;
1124         __u32   *remote_rcs;
1125
1126         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127                                                   sizeof(*remote_rcs) *
1128                                                   niocount);
1129         if (remote_rcs == NULL) {
1130                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1131                 return(-EPROTO);
1132         }
1133
1134         /* return error if any niobuf was in error */
1135         for (i = 0; i < niocount; i++) {
1136                 if ((int)remote_rcs[i] < 0) {
1137                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138                                i, remote_rcs[i], req);
1139                         return remote_rcs[i];
1140                 }
1141
1142                 if (remote_rcs[i] != 0) {
1143                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144                                 i, remote_rcs[i], req);
1145                         return(-EPROTO);
1146                 }
1147         }
1148         if (req->rq_bulk != NULL &&
1149             req->rq_bulk->bd_nob_transferred != requested_nob) {
1150                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151                        req->rq_bulk->bd_nob_transferred, requested_nob);
1152                 return(-EPROTO);
1153         }
1154
1155         return (0);
1156 }
1157
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1159 {
1160         if (p1->flag != p2->flag) {
1161                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1163                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC |
1164                                   OBD_BRW_SYS_RESOURCE);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at https://jira.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181                                    size_t pg_count, struct brw_page **pga,
1182                                    int opc, obd_dif_csum_fn *fn,
1183                                    int sector_size,
1184                                    u32 *check_sum, bool resend)
1185 {
1186         struct ahash_request *req;
1187         /* Used Adler as the default checksum type on top of DIF tags */
1188         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189         struct page *__page;
1190         unsigned char *buffer;
1191         __u16 *guard_start;
1192         unsigned int bufsize;
1193         int guard_number;
1194         int used_number = 0;
1195         int used;
1196         u32 cksum;
1197         int rc = 0;
1198         int i = 0;
1199
1200         LASSERT(pg_count > 0);
1201
1202         __page = alloc_page(GFP_KERNEL);
1203         if (__page == NULL)
1204                 return -ENOMEM;
1205
1206         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207         if (IS_ERR(req)) {
1208                 rc = PTR_ERR(req);
1209                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1211                 GOTO(out, rc);
1212         }
1213
1214         buffer = kmap(__page);
1215         guard_start = (__u16 *)buffer;
1216         guard_number = PAGE_SIZE / sizeof(*guard_start);
1217         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218                "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219                guard_number, resend, nob, pg_count);
1220
1221         while (nob > 0 && pg_count > 0) {
1222                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1223
1224                 /* corrupt the data before we compute the checksum, to
1225                  * simulate an OST->client data error */
1226                 if (unlikely(i == 0 && opc == OST_READ &&
1227                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1228                         unsigned char *ptr = kmap(pga[i]->pg);
1229                         int off = pga[i]->off & ~PAGE_MASK;
1230
1231                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1232                         kunmap(pga[i]->pg);
1233                 }
1234
1235                 /*
1236                  * The left guard number should be able to hold checksums of a
1237                  * whole page
1238                  */
1239                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1240                                                   pga[i]->off & ~PAGE_MASK,
1241                                                   count,
1242                                                   guard_start + used_number,
1243                                                   guard_number - used_number,
1244                                                   &used, sector_size,
1245                                                   fn);
1246                 if (unlikely(resend))
1247                         CDEBUG(D_PAGE | D_HA,
1248                                "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1249                                i, used, pga[i]->off & ~PAGE_MASK, count,
1250                                (int)(used * sizeof(*guard_start)),
1251                                guard_start + used_number);
1252                 if (rc)
1253                         break;
1254
1255                 used_number += used;
1256                 if (used_number == guard_number) {
1257                         cfs_crypto_hash_update_page(req, __page, 0,
1258                                 used_number * sizeof(*guard_start));
1259                         used_number = 0;
1260                 }
1261
1262                 nob -= pga[i]->count;
1263                 pg_count--;
1264                 i++;
1265         }
1266         kunmap(__page);
1267         if (rc)
1268                 GOTO(out, rc);
1269
1270         if (used_number != 0)
1271                 cfs_crypto_hash_update_page(req, __page, 0,
1272                         used_number * sizeof(*guard_start));
1273
1274         bufsize = sizeof(cksum);
1275         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1276
1277         /* For sending we only compute the wrong checksum instead
1278          * of corrupting the data so it is still correct on a redo */
1279         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1280                 cksum++;
1281
1282         *check_sum = cksum;
1283 out:
1284         __free_page(__page);
1285         return rc;
1286 }
1287 #else /* !CONFIG_CRC_T10DIF */
1288 #define obd_dif_ip_fn NULL
1289 #define obd_dif_crc_fn NULL
1290 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1291         -EOPNOTSUPP
1292 #endif /* CONFIG_CRC_T10DIF */
1293
1294 static int osc_checksum_bulk(int nob, size_t pg_count,
1295                              struct brw_page **pga, int opc,
1296                              enum cksum_types cksum_type,
1297                              u32 *cksum)
1298 {
1299         int                             i = 0;
1300         struct ahash_request           *req;
1301         unsigned int                    bufsize;
1302         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1303
1304         LASSERT(pg_count > 0);
1305
1306         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1307         if (IS_ERR(req)) {
1308                 CERROR("Unable to initialize checksum hash %s\n",
1309                        cfs_crypto_hash_name(cfs_alg));
1310                 return PTR_ERR(req);
1311         }
1312
1313         while (nob > 0 && pg_count > 0) {
1314                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1315
1316                 /* corrupt the data before we compute the checksum, to
1317                  * simulate an OST->client data error */
1318                 if (i == 0 && opc == OST_READ &&
1319                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1320                         unsigned char *ptr = kmap(pga[i]->pg);
1321                         int off = pga[i]->off & ~PAGE_MASK;
1322
1323                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1324                         kunmap(pga[i]->pg);
1325                 }
1326                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1327                                             pga[i]->off & ~PAGE_MASK,
1328                                             count);
1329                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1330                                (int)(pga[i]->off & ~PAGE_MASK));
1331
1332                 nob -= pga[i]->count;
1333                 pg_count--;
1334                 i++;
1335         }
1336
1337         bufsize = sizeof(*cksum);
1338         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1339
1340         /* For sending we only compute the wrong checksum instead
1341          * of corrupting the data so it is still correct on a redo */
1342         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1343                 (*cksum)++;
1344
1345         return 0;
1346 }
1347
1348 static int osc_checksum_bulk_rw(const char *obd_name,
1349                                 enum cksum_types cksum_type,
1350                                 int nob, size_t pg_count,
1351                                 struct brw_page **pga, int opc,
1352                                 u32 *check_sum, bool resend)
1353 {
1354         obd_dif_csum_fn *fn = NULL;
1355         int sector_size = 0;
1356         int rc;
1357
1358         ENTRY;
1359         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1360
1361         if (fn)
1362                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1363                                              opc, fn, sector_size, check_sum,
1364                                              resend);
1365         else
1366                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1367                                        check_sum);
1368
1369         RETURN(rc);
1370 }
1371
1372 static inline void osc_release_bounce_pages(struct brw_page **pga,
1373                                             u32 page_count)
1374 {
1375 #ifdef HAVE_LUSTRE_CRYPTO
1376         int i;
1377
1378         for (i = 0; i < page_count; i++) {
1379                 /* Bounce pages allocated by a call to
1380                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1381                  * are identified thanks to the PageChecked flag.
1382                  */
1383                 if (PageChecked(pga[i]->pg))
1384                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1385                 pga[i]->count -= pga[i]->bp_count_diff;
1386                 pga[i]->off += pga[i]->bp_off_diff;
1387         }
1388 #endif
1389 }
1390
1391 static int
1392 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1393                      u32 page_count, struct brw_page **pga,
1394                      struct ptlrpc_request **reqp, int resend)
1395 {
1396         struct ptlrpc_request *req;
1397         struct ptlrpc_bulk_desc *desc;
1398         struct ost_body *body;
1399         struct obd_ioobj *ioobj;
1400         struct niobuf_remote *niobuf;
1401         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1402         struct osc_brw_async_args *aa;
1403         struct req_capsule *pill;
1404         struct brw_page *pg_prev;
1405         void *short_io_buf;
1406         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1407         struct inode *inode = NULL;
1408         bool directio = false;
1409         bool enable_checksum = true;
1410
1411         ENTRY;
1412         if (pga[0]->pg) {
1413                 inode = page2inode(pga[0]->pg);
1414                 if (inode == NULL) {
1415                         /* Try to get reference to inode from cl_page if we are
1416                          * dealing with direct IO, as handled pages are not
1417                          * actual page cache pages.
1418                          */
1419                         struct osc_async_page *oap = brw_page2oap(pga[0]);
1420                         struct cl_page *clpage = oap2cl_page(oap);
1421
1422                         inode = clpage->cp_inode;
1423                         if (inode)
1424                                 directio = true;
1425                 }
1426         }
1427         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1428                 RETURN(-ENOMEM); /* Recoverable */
1429         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1430                 RETURN(-EINVAL); /* Fatal */
1431
1432         if ((cmd & OBD_BRW_WRITE) != 0) {
1433                 opc = OST_WRITE;
1434                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1435                                                 osc_rq_pool,
1436                                                 &RQF_OST_BRW_WRITE);
1437         } else {
1438                 opc = OST_READ;
1439                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1440         }
1441         if (req == NULL)
1442                 RETURN(-ENOMEM);
1443
1444         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1445             llcrypt_has_encryption_key(inode)) {
1446                 for (i = 0; i < page_count; i++) {
1447                         struct brw_page *pg = pga[i];
1448                         struct page *data_page = NULL;
1449                         bool retried = false;
1450                         bool lockedbymyself;
1451                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1452                         struct address_space *map_orig = NULL;
1453                         pgoff_t index_orig;
1454
1455 retry_encrypt:
1456                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1457                         /* The page can already be locked when we arrive here.
1458                          * This is possible when cl_page_assume/vvp_page_assume
1459                          * is stuck on wait_on_page_writeback with page lock
1460                          * held. In this case there is no risk for the lock to
1461                          * be released while we are doing our encryption
1462                          * processing, because writeback against that page will
1463                          * end in vvp_page_completion_write/cl_page_completion,
1464                          * which means only once the page is fully processed.
1465                          */
1466                         lockedbymyself = trylock_page(pg->pg);
1467                         if (directio) {
1468                                 map_orig = pg->pg->mapping;
1469                                 pg->pg->mapping = inode->i_mapping;
1470                                 index_orig = pg->pg->index;
1471                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1472                         }
1473                         data_page =
1474                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1475                                                                  nunits, 0,
1476                                                                  GFP_NOFS);
1477                         if (directio) {
1478                                 pg->pg->mapping = map_orig;
1479                                 pg->pg->index = index_orig;
1480                         }
1481                         if (lockedbymyself)
1482                                 unlock_page(pg->pg);
1483                         if (IS_ERR(data_page)) {
1484                                 rc = PTR_ERR(data_page);
1485                                 if (rc == -ENOMEM && !retried) {
1486                                         retried = true;
1487                                         rc = 0;
1488                                         goto retry_encrypt;
1489                                 }
1490                                 ptlrpc_request_free(req);
1491                                 RETURN(rc);
1492                         }
1493                         /* Set PageChecked flag on bounce page for
1494                          * disambiguation in osc_release_bounce_pages().
1495                          */
1496                         SetPageChecked(data_page);
1497                         pg->pg = data_page;
1498                         /* there should be no gap in the middle of page array */
1499                         if (i == page_count - 1) {
1500                                 struct osc_async_page *oap = brw_page2oap(pg);
1501
1502                                 oa->o_size = oap->oap_count +
1503                                         oap->oap_obj_off + oap->oap_page_off;
1504                         }
1505                         /* len is forced to nunits, and relative offset to 0
1506                          * so store the old, clear text info
1507                          */
1508                         pg->bp_count_diff = nunits - pg->count;
1509                         pg->count = nunits;
1510                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1511                         pg->off = pg->off & PAGE_MASK;
1512                 }
1513         } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1514                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1515                 struct cl_page *clpage = oap2cl_page(oap);
1516                 struct cl_object *clobj = clpage->cp_obj;
1517                 struct cl_attr attr = { 0 };
1518                 struct lu_env *env;
1519                 __u16 refcheck;
1520
1521                 env = cl_env_get(&refcheck);
1522                 if (IS_ERR(env)) {
1523                         rc = PTR_ERR(env);
1524                         ptlrpc_request_free(req);
1525                         RETURN(rc);
1526                 }
1527
1528                 cl_object_attr_lock(clobj);
1529                 rc = cl_object_attr_get(env, clobj, &attr);
1530                 cl_object_attr_unlock(clobj);
1531                 cl_env_put(env, &refcheck);
1532                 if (rc != 0) {
1533                         ptlrpc_request_free(req);
1534                         RETURN(rc);
1535                 }
1536                 if (attr.cat_size)
1537                         oa->o_size = attr.cat_size;
1538         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1539                    llcrypt_has_encryption_key(inode)) {
1540                 for (i = 0; i < page_count; i++) {
1541                         struct brw_page *pg = pga[i];
1542                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1543
1544                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1545                         /* count/off are forced to cover the whole encryption
1546                          * unit size so that all encrypted data is stored on the
1547                          * OST, so adjust bp_{count,off}_diff for the size of
1548                          * the clear text.
1549                          */
1550                         pg->bp_count_diff = nunits - pg->count;
1551                         pg->count = nunits;
1552                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1553                         pg->off = pg->off & PAGE_MASK;
1554                 }
1555         }
1556
1557         for (niocount = i = 1; i < page_count; i++) {
1558                 if (!can_merge_pages(pga[i - 1], pga[i]))
1559                         niocount++;
1560         }
1561
1562         pill = &req->rq_pill;
1563         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1564                              sizeof(*ioobj));
1565         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1566                              niocount * sizeof(*niobuf));
1567
1568         for (i = 0; i < page_count; i++) {
1569                 short_io_size += pga[i]->count;
1570                 if (!inode || !IS_ENCRYPTED(inode) ||
1571                     !llcrypt_has_encryption_key(inode)) {
1572                         pga[i]->bp_count_diff = 0;
1573                         pga[i]->bp_off_diff = 0;
1574                 }
1575         }
1576
1577         if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1578                 enable_checksum = false;
1579                 short_io_size = 0;
1580         }
1581
1582         /* Check if read/write is small enough to be a short io. */
1583         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1584             !imp_connect_shortio(cli->cl_import))
1585                 short_io_size = 0;
1586
1587         /* If this is an empty RPC to old server, just ignore it */
1588         if (!short_io_size && !pga[0]->pg) {
1589                 ptlrpc_request_free(req);
1590                 RETURN(-ENODATA);
1591         }
1592
1593         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1594                              opc == OST_READ ? 0 : short_io_size);
1595         if (opc == OST_READ)
1596                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1597                                      short_io_size);
1598
1599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1600         if (rc) {
1601                 ptlrpc_request_free(req);
1602                 RETURN(rc);
1603         }
1604         osc_set_io_portal(req);
1605
1606         ptlrpc_at_set_req_timeout(req);
1607         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1608          * retry logic */
1609         req->rq_no_retry_einprogress = 1;
1610
1611         if (short_io_size != 0) {
1612                 desc = NULL;
1613                 short_io_buf = NULL;
1614                 goto no_bulk;
1615         }
1616
1617         desc = ptlrpc_prep_bulk_imp(req, page_count,
1618                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1619                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1620                         PTLRPC_BULK_PUT_SINK),
1621                 OST_BULK_PORTAL,
1622                 &ptlrpc_bulk_kiov_pin_ops);
1623
1624         if (desc == NULL)
1625                 GOTO(out, rc = -ENOMEM);
1626         /* NB request now owns desc and will free it when it gets freed */
1627 no_bulk:
1628         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1629         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1630         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1631         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1632
1633         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1634
1635         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1636          * and from_kgid(), because they are asynchronous. Fortunately, variable
1637          * oa contains valid o_uid and o_gid in these two operations.
1638          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1639          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1640          * other process logic */
1641         body->oa.o_uid = oa->o_uid;
1642         body->oa.o_gid = oa->o_gid;
1643
1644         obdo_to_ioobj(oa, ioobj);
1645         ioobj->ioo_bufcnt = niocount;
1646         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1647          * that might be send for this request.  The actual number is decided
1648          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1649          * "max - 1" for old client compatibility sending "0", and also so the
1650          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1651         if (desc != NULL)
1652                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1653         else /* short io */
1654                 ioobj_max_brw_set(ioobj, 0);
1655
1656         if (short_io_size != 0) {
1657                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1658                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1659                         body->oa.o_flags = 0;
1660                 }
1661                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1662                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1663                        short_io_size);
1664                 if (opc == OST_WRITE) {
1665                         short_io_buf = req_capsule_client_get(pill,
1666                                                               &RMF_SHORT_IO);
1667                         LASSERT(short_io_buf != NULL);
1668                 }
1669         }
1670
1671         LASSERT(page_count > 0);
1672         pg_prev = pga[0];
1673         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1674                 struct brw_page *pg = pga[i];
1675                 int poff = pg->off & ~PAGE_MASK;
1676
1677                 LASSERT(pg->count > 0);
1678                 /* make sure there is no gap in the middle of page array */
1679                 LASSERTF(page_count == 1 ||
1680                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1681                           ergo(i > 0 && i < page_count - 1,
1682                                poff == 0 && pg->count == PAGE_SIZE)   &&
1683                           ergo(i == page_count - 1, poff == 0)),
1684                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1685                          i, page_count, pg, pg->off, pg->count);
1686                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1687                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1688                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1689                          i, page_count,
1690                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1691                          pg_prev->pg, page_private(pg_prev->pg),
1692                          pg_prev->pg->index, pg_prev->off);
1693                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1694                         (pg->flag & OBD_BRW_SRVLOCK));
1695                 if (short_io_size != 0 && opc == OST_WRITE) {
1696                         unsigned char *ptr = kmap_atomic(pg->pg);
1697
1698                         LASSERT(short_io_size >= requested_nob + pg->count);
1699                         memcpy(short_io_buf + requested_nob,
1700                                ptr + poff,
1701                                pg->count);
1702                         kunmap_atomic(ptr);
1703                 } else if (short_io_size == 0) {
1704                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1705                                                          pg->count);
1706                 }
1707                 requested_nob += pg->count;
1708
1709                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1710                         niobuf--;
1711                         niobuf->rnb_len += pg->count;
1712                 } else {
1713                         niobuf->rnb_offset = pg->off;
1714                         niobuf->rnb_len    = pg->count;
1715                         niobuf->rnb_flags  = pg->flag;
1716                 }
1717                 pg_prev = pg;
1718         }
1719
1720         LASSERTF((void *)(niobuf - niocount) ==
1721                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1722                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1723                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1724
1725         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1726         if (resend) {
1727                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1728                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1729                         body->oa.o_flags = 0;
1730                 }
1731                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1732         }
1733
1734         if (osc_should_shrink_grant(cli))
1735                 osc_shrink_grant_local(cli, &body->oa);
1736
1737         if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1738                 enable_checksum = false;
1739
1740         /* size[REQ_REC_OFF] still sizeof (*body) */
1741         if (opc == OST_WRITE) {
1742                 if (enable_checksum) {
1743                         /* store cl_cksum_type in a local variable since
1744                          * it can be changed via lprocfs */
1745                         enum cksum_types cksum_type = cli->cl_cksum_type;
1746
1747                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1748                                 body->oa.o_flags = 0;
1749
1750                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1751                                                                 cksum_type);
1752                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1753
1754                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1755                                                   requested_nob, page_count,
1756                                                   pga, OST_WRITE,
1757                                                   &body->oa.o_cksum, resend);
1758                         if (rc < 0) {
1759                                 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1760                                        rc);
1761                                 GOTO(out, rc);
1762                         }
1763                         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1764                                "checksum at write origin: %x (%x)\n",
1765                                body->oa.o_cksum, cksum_type);
1766
1767                         /* save this in 'oa', too, for later checking */
1768                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1769                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1770                                                            cksum_type);
1771                 } else {
1772                         /* clear out the checksum flag, in case this is a
1773                          * resend but cl_checksum is no longer set. b=11238 */
1774                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1775                 }
1776                 oa->o_cksum = body->oa.o_cksum;
1777                 /* 1 RC per niobuf */
1778                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1779                                      sizeof(__u32) * niocount);
1780         } else {
1781                 if (enable_checksum) {
1782                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1783                                 body->oa.o_flags = 0;
1784                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1785                                 cli->cl_cksum_type);
1786                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1787                 }
1788
1789                 /* Client cksum has been already copied to wire obdo in previous
1790                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1791                  * resent due to cksum error, this will allow Server to
1792                  * check+dump pages on its side */
1793         }
1794         ptlrpc_request_set_replen(req);
1795
1796         aa = ptlrpc_req_async_args(aa, req);
1797         aa->aa_oa = oa;
1798         aa->aa_requested_nob = requested_nob;
1799         aa->aa_nio_count = niocount;
1800         aa->aa_page_count = page_count;
1801         aa->aa_resends = 0;
1802         aa->aa_ppga = pga;
1803         aa->aa_cli = cli;
1804         INIT_LIST_HEAD(&aa->aa_oaps);
1805
1806         *reqp = req;
1807         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1808         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1809                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1810                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1811         RETURN(0);
1812
1813  out:
1814         ptlrpc_req_finished(req);
1815         RETURN(rc);
1816 }
1817
1818 char dbgcksum_file_name[PATH_MAX];
1819
1820 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1821                                 struct brw_page **pga, __u32 server_cksum,
1822                                 __u32 client_cksum)
1823 {
1824         struct file *filp;
1825         int rc, i;
1826         unsigned int len;
1827         char *buf;
1828
1829         /* will only keep dump of pages on first error for the same range in
1830          * file/fid, not during the resends/retries. */
1831         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1832                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1833                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1834                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1835                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1836                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1837                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1838                  pga[0]->off,
1839                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1840                  client_cksum, server_cksum);
1841         CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1842         filp = filp_open(dbgcksum_file_name,
1843                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1844         if (IS_ERR(filp)) {
1845                 rc = PTR_ERR(filp);
1846                 if (rc == -EEXIST)
1847                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1848                                "checksum error: rc = %d\n", dbgcksum_file_name,
1849                                rc);
1850                 else
1851                         CERROR("%s: can't open to dump pages with checksum "
1852                                "error: rc = %d\n", dbgcksum_file_name, rc);
1853                 return;
1854         }
1855
1856         for (i = 0; i < page_count; i++) {
1857                 len = pga[i]->count;
1858                 buf = kmap(pga[i]->pg);
1859                 while (len != 0) {
1860                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1861                         if (rc < 0) {
1862                                 CERROR("%s: wanted to write %u but got %d "
1863                                        "error\n", dbgcksum_file_name, len, rc);
1864                                 break;
1865                         }
1866                         len -= rc;
1867                         buf += rc;
1868                 }
1869                 kunmap(pga[i]->pg);
1870         }
1871
1872         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1873         if (rc)
1874                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1875         filp_close(filp, NULL);
1876
1877         libcfs_debug_dumplog();
1878 }
1879
1880 static int
1881 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1882                      __u32 client_cksum, __u32 server_cksum,
1883                      struct osc_brw_async_args *aa)
1884 {
1885         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1886         enum cksum_types cksum_type;
1887         obd_dif_csum_fn *fn = NULL;
1888         int sector_size = 0;
1889         __u32 new_cksum;
1890         char *msg;
1891         int rc;
1892
1893         if (server_cksum == client_cksum) {
1894                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1895                 return 0;
1896         }
1897
1898         if (aa->aa_cli->cl_checksum_dump)
1899                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1900                                     server_cksum, client_cksum);
1901
1902         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1903                                            oa->o_flags : 0);
1904
1905         switch (cksum_type) {
1906         case OBD_CKSUM_T10IP512:
1907                 fn = obd_dif_ip_fn;
1908                 sector_size = 512;
1909                 break;
1910         case OBD_CKSUM_T10IP4K:
1911                 fn = obd_dif_ip_fn;
1912                 sector_size = 4096;
1913                 break;
1914         case OBD_CKSUM_T10CRC512:
1915                 fn = obd_dif_crc_fn;
1916                 sector_size = 512;
1917                 break;
1918         case OBD_CKSUM_T10CRC4K:
1919                 fn = obd_dif_crc_fn;
1920                 sector_size = 4096;
1921                 break;
1922         default:
1923                 break;
1924         }
1925
1926         if (fn)
1927                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1928                                              aa->aa_page_count, aa->aa_ppga,
1929                                              OST_WRITE, fn, sector_size,
1930                                              &new_cksum, true);
1931         else
1932                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1933                                        aa->aa_ppga, OST_WRITE, cksum_type,
1934                                        &new_cksum);
1935
1936         if (rc < 0)
1937                 msg = "failed to calculate the client write checksum";
1938         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1939                 msg = "the server did not use the checksum type specified in "
1940                       "the original request - likely a protocol problem";
1941         else if (new_cksum == server_cksum)
1942                 msg = "changed on the client after we checksummed it - "
1943                       "likely false positive due to mmap IO (bug 11742)";
1944         else if (new_cksum == client_cksum)
1945                 msg = "changed in transit before arrival at OST";
1946         else
1947                 msg = "changed in transit AND doesn't match the original - "
1948                       "likely false positive due to mmap IO (bug 11742)";
1949
1950         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1951                            DFID " object "DOSTID" extent [%llu-%llu], original "
1952                            "client csum %x (type %x), server csum %x (type %x),"
1953                            " client csum now %x\n",
1954                            obd_name, msg, libcfs_nid2str(peer->nid),
1955                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1956                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1957                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1958                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1959                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1960                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1961                            client_cksum,
1962                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1963                            server_cksum, cksum_type, new_cksum);
1964         return 1;
1965 }
1966
1967 /* Note rc enters this function as number of bytes transferred */
1968 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1969 {
1970         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1971         struct client_obd *cli = aa->aa_cli;
1972         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1973         const struct lnet_process_id *peer =
1974                 &req->rq_import->imp_connection->c_peer;
1975         struct ost_body *body;
1976         u32 client_cksum = 0;
1977         struct inode *inode;
1978         unsigned int blockbits = 0, blocksize = 0;
1979
1980         ENTRY;
1981
1982         if (rc < 0 && rc != -EDQUOT) {
1983                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1984                 RETURN(rc);
1985         }
1986
1987         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1988         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1989         if (body == NULL) {
1990                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1991                 RETURN(-EPROTO);
1992         }
1993
1994         /* set/clear over quota flag for a uid/gid/projid */
1995         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1996             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1997                 unsigned qid[LL_MAXQUOTAS] = {
1998                                          body->oa.o_uid, body->oa.o_gid,
1999                                          body->oa.o_projid };
2000                 CDEBUG(D_QUOTA,
2001                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
2002                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
2003                        body->oa.o_valid, body->oa.o_flags);
2004                 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2005                                 body->oa.o_flags);
2006         }
2007
2008         osc_update_grant(cli, body);
2009
2010         if (rc < 0)
2011                 RETURN(rc);
2012
2013         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2014                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2015
2016         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2017                 if (rc > 0) {
2018                         CERROR("%s: unexpected positive size %d\n",
2019                                obd_name, rc);
2020                         RETURN(-EPROTO);
2021                 }
2022
2023                 if (req->rq_bulk != NULL &&
2024                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2025                         RETURN(-EAGAIN);
2026
2027                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2028                     check_write_checksum(&body->oa, peer, client_cksum,
2029                                          body->oa.o_cksum, aa))
2030                         RETURN(-EAGAIN);
2031
2032                 rc = check_write_rcs(req, aa->aa_requested_nob,
2033                                      aa->aa_nio_count, aa->aa_page_count,
2034                                      aa->aa_ppga);
2035                 GOTO(out, rc);
2036         }
2037
2038         /* The rest of this function executes only for OST_READs */
2039
2040         if (req->rq_bulk == NULL) {
2041                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2042                                           RCL_SERVER);
2043                 LASSERT(rc == req->rq_status);
2044         } else {
2045                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2046                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2047         }
2048         if (rc < 0)
2049                 GOTO(out, rc = -EAGAIN);
2050
2051         if (rc > aa->aa_requested_nob) {
2052                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2053                        rc, aa->aa_requested_nob);
2054                 RETURN(-EPROTO);
2055         }
2056
2057         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2058                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2059                        rc, req->rq_bulk->bd_nob_transferred);
2060                 RETURN(-EPROTO);
2061         }
2062
2063         if (req->rq_bulk == NULL) {
2064                 /* short io */
2065                 int nob, pg_count, i = 0;
2066                 unsigned char *buf;
2067
2068                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2069                 pg_count = aa->aa_page_count;
2070                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2071                                                    rc);
2072                 nob = rc;
2073                 while (nob > 0 && pg_count > 0) {
2074                         unsigned char *ptr;
2075                         int count = aa->aa_ppga[i]->count > nob ?
2076                                     nob : aa->aa_ppga[i]->count;
2077
2078                         CDEBUG(D_CACHE, "page %p count %d\n",
2079                                aa->aa_ppga[i]->pg, count);
2080                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2081                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2082                                count);
2083                         kunmap_atomic((void *) ptr);
2084
2085                         buf += count;
2086                         nob -= count;
2087                         i++;
2088                         pg_count--;
2089                 }
2090         }
2091
2092         if (rc < aa->aa_requested_nob)
2093                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2094
2095         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2096                 static int cksum_counter;
2097                 u32 server_cksum = body->oa.o_cksum;
2098                 int nob = rc;
2099                 char *via = "";
2100                 char *router = "";
2101                 enum cksum_types cksum_type;
2102                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2103                         body->oa.o_flags : 0;
2104
2105                 cksum_type = obd_cksum_type_unpack(o_flags);
2106                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2107                                           aa->aa_page_count, aa->aa_ppga,
2108                                           OST_READ, &client_cksum, false);
2109                 if (rc < 0)
2110                         GOTO(out, rc);
2111
2112                 if (req->rq_bulk != NULL &&
2113                     peer->nid != req->rq_bulk->bd_sender) {
2114                         via = " via ";
2115                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2116                 }
2117
2118                 if (server_cksum != client_cksum) {
2119                         struct ost_body *clbody;
2120                         __u32 client_cksum2;
2121                         u32 page_count = aa->aa_page_count;
2122
2123                         osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2124                                              page_count, aa->aa_ppga,
2125                                              OST_READ, &client_cksum2, true);
2126                         clbody = req_capsule_client_get(&req->rq_pill,
2127                                                         &RMF_OST_BODY);
2128                         if (cli->cl_checksum_dump)
2129                                 dump_all_bulk_pages(&clbody->oa, page_count,
2130                                                     aa->aa_ppga, server_cksum,
2131                                                     client_cksum);
2132
2133                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2134                                            "%s%s%s inode "DFID" object "DOSTID
2135                                            " extent [%llu-%llu], client %x/%x, "
2136                                            "server %x, cksum_type %x\n",
2137                                            obd_name,
2138                                            libcfs_nid2str(peer->nid),
2139                                            via, router,
2140                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2141                                                 clbody->oa.o_parent_seq : 0ULL,
2142                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2143                                                 clbody->oa.o_parent_oid : 0,
2144                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2145                                                 clbody->oa.o_parent_ver : 0,
2146                                            POSTID(&body->oa.o_oi),
2147                                            aa->aa_ppga[0]->off,
2148                                            aa->aa_ppga[page_count-1]->off +
2149                                            aa->aa_ppga[page_count-1]->count - 1,
2150                                            client_cksum, client_cksum2,
2151                                            server_cksum, cksum_type);
2152                         cksum_counter = 0;
2153                         aa->aa_oa->o_cksum = client_cksum;
2154                         rc = -EAGAIN;
2155                 } else {
2156                         cksum_counter++;
2157                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2158                         rc = 0;
2159                 }
2160         } else if (unlikely(client_cksum)) {
2161                 static int cksum_missed;
2162
2163                 cksum_missed++;
2164                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2165                         CERROR("%s: checksum %u requested from %s but not sent\n",
2166                                obd_name, cksum_missed,
2167                                libcfs_nid2str(peer->nid));
2168         } else {
2169                 rc = 0;
2170         }
2171
2172         inode = page2inode(aa->aa_ppga[0]->pg);
2173         if (inode == NULL) {
2174                 /* Try to get reference to inode from cl_page if we are
2175                  * dealing with direct IO, as handled pages are not
2176                  * actual page cache pages.
2177                  */
2178                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2179
2180                 inode = oap2cl_page(oap)->cp_inode;
2181                 if (inode) {
2182                         blockbits = inode->i_blkbits;
2183                         blocksize = 1 << blockbits;
2184                 }
2185         }
2186         if (inode && IS_ENCRYPTED(inode)) {
2187                 int idx;
2188
2189                 if (!llcrypt_has_encryption_key(inode)) {
2190                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2191                         GOTO(out, rc);
2192                 }
2193                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2194                         struct brw_page *pg = aa->aa_ppga[idx];
2195                         unsigned int offs = 0;
2196
2197                         while (offs < PAGE_SIZE) {
2198                                 /* do not decrypt if page is all 0s */
2199                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2200                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2201                                         /* if page is empty forward info to
2202                                          * upper layers (ll_io_zero_page) by
2203                                          * clearing PagePrivate2
2204                                          */
2205                                         if (!offs)
2206                                                 ClearPagePrivate2(pg->pg);
2207                                         break;
2208                                 }
2209
2210                                 if (blockbits) {
2211                                         /* This is direct IO case. Directly call
2212                                          * decrypt function that takes inode as
2213                                          * input parameter. Page does not need
2214                                          * to be locked.
2215                                          */
2216                                         u64 lblk_num =
2217                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2218                                                      (PAGE_SHIFT - blockbits)) +
2219                                                        (offs >> blockbits);
2220                                         unsigned int i;
2221
2222                                         for (i = offs;
2223                                              i < offs +
2224                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2225                                              i += blocksize, lblk_num++) {
2226                                                 rc =
2227                                                   llcrypt_decrypt_block_inplace(
2228                                                           inode, pg->pg,
2229                                                           blocksize, i,
2230                                                           lblk_num);
2231                                                 if (rc)
2232                                                         break;
2233                                         }
2234                                 } else {
2235                                         rc = llcrypt_decrypt_pagecache_blocks(
2236                                                 pg->pg,
2237                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2238                                                 offs);
2239                                 }
2240                                 if (rc)
2241                                         GOTO(out, rc);
2242
2243                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2244                         }
2245                 }
2246         }
2247
2248 out:
2249         if (rc >= 0)
2250                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2251                                      aa->aa_oa, &body->oa);
2252
2253         RETURN(rc);
2254 }
2255
2256 static int osc_brw_redo_request(struct ptlrpc_request *request,
2257                                 struct osc_brw_async_args *aa, int rc)
2258 {
2259         struct ptlrpc_request *new_req;
2260         struct osc_brw_async_args *new_aa;
2261         struct osc_async_page *oap;
2262         ENTRY;
2263
2264         /* The below message is checked in replay-ost-single.sh test_8ae*/
2265         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2266                   "redo for recoverable error %d", rc);
2267
2268         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2269                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2270                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2271                                   aa->aa_ppga, &new_req, 1);
2272         if (rc)
2273                 RETURN(rc);
2274
2275         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2276                 if (oap->oap_request != NULL) {
2277                         LASSERTF(request == oap->oap_request,
2278                                  "request %p != oap_request %p\n",
2279                                  request, oap->oap_request);
2280                 }
2281         }
2282         /*
2283          * New request takes over pga and oaps from old request.
2284          * Note that copying a list_head doesn't work, need to move it...
2285          */
2286         aa->aa_resends++;
2287         new_req->rq_interpret_reply = request->rq_interpret_reply;
2288         new_req->rq_async_args = request->rq_async_args;
2289         new_req->rq_commit_cb = request->rq_commit_cb;
2290         /* cap resend delay to the current request timeout, this is similar to
2291          * what ptlrpc does (see after_reply()) */
2292         if (aa->aa_resends > new_req->rq_timeout)
2293                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2294         else
2295                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2296         new_req->rq_generation_set = 1;
2297         new_req->rq_import_generation = request->rq_import_generation;
2298
2299         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2300
2301         INIT_LIST_HEAD(&new_aa->aa_oaps);
2302         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2303         INIT_LIST_HEAD(&new_aa->aa_exts);
2304         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2305         new_aa->aa_resends = aa->aa_resends;
2306
2307         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2308                 if (oap->oap_request) {
2309                         ptlrpc_req_finished(oap->oap_request);
2310                         oap->oap_request = ptlrpc_request_addref(new_req);
2311                 }
2312         }
2313
2314         /* XXX: This code will run into problem if we're going to support
2315          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2316          * and wait for all of them to be finished. We should inherit request
2317          * set from old request. */
2318         ptlrpcd_add_req(new_req);
2319
2320         DEBUG_REQ(D_INFO, new_req, "new request");
2321         RETURN(0);
2322 }
2323
2324 /*
2325  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2326  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2327  * fine for our small page arrays and doesn't require allocation.  its an
2328  * insertion sort that swaps elements that are strides apart, shrinking the
2329  * stride down until its '1' and the array is sorted.
2330  */
2331 static void sort_brw_pages(struct brw_page **array, int num)
2332 {
2333         int stride, i, j;
2334         struct brw_page *tmp;
2335
2336         if (num == 1)
2337                 return;
2338         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2339                 ;
2340
2341         do {
2342                 stride /= 3;
2343                 for (i = stride ; i < num ; i++) {
2344                         tmp = array[i];
2345                         j = i;
2346                         while (j >= stride && array[j - stride]->off > tmp->off) {
2347                                 array[j] = array[j - stride];
2348                                 j -= stride;
2349                         }
2350                         array[j] = tmp;
2351                 }
2352         } while (stride > 1);
2353 }
2354
2355 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2356 {
2357         LASSERT(ppga != NULL);
2358         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2359 }
2360
2361 static int brw_interpret(const struct lu_env *env,
2362                          struct ptlrpc_request *req, void *args, int rc)
2363 {
2364         struct osc_brw_async_args *aa = args;
2365         struct osc_extent *ext;
2366         struct osc_extent *tmp;
2367         struct client_obd *cli = aa->aa_cli;
2368         unsigned long transferred = 0;
2369
2370         ENTRY;
2371
2372         rc = osc_brw_fini_request(req, rc);
2373         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2374
2375         /* restore clear text pages */
2376         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2377
2378         /*
2379          * When server returns -EINPROGRESS, client should always retry
2380          * regardless of the number of times the bulk was resent already.
2381          */
2382         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2383                 if (req->rq_import_generation !=
2384                     req->rq_import->imp_generation) {
2385                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2386                                ""DOSTID", rc = %d.\n",
2387                                req->rq_import->imp_obd->obd_name,
2388                                POSTID(&aa->aa_oa->o_oi), rc);
2389                 } else if (rc == -EINPROGRESS ||
2390                            client_should_resend(aa->aa_resends, aa->aa_cli)) {
2391                         rc = osc_brw_redo_request(req, aa, rc);
2392                 } else {
2393                         CERROR("%s: too many resent retries for object: "
2394                                "%llu:%llu, rc = %d.\n",
2395                                req->rq_import->imp_obd->obd_name,
2396                                POSTID(&aa->aa_oa->o_oi), rc);
2397                 }
2398
2399                 if (rc == 0)
2400                         RETURN(0);
2401                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2402                         rc = -EIO;
2403         }
2404
2405         if (rc == 0) {
2406                 struct obdo *oa = aa->aa_oa;
2407                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2408                 unsigned long valid = 0;
2409                 struct cl_object *obj;
2410                 struct osc_async_page *last;
2411
2412                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2413                 obj = osc2cl(last->oap_obj);
2414
2415                 cl_object_attr_lock(obj);
2416                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2417                         attr->cat_blocks = oa->o_blocks;
2418                         valid |= CAT_BLOCKS;
2419                 }
2420                 if (oa->o_valid & OBD_MD_FLMTIME) {
2421                         attr->cat_mtime = oa->o_mtime;
2422                         valid |= CAT_MTIME;
2423                 }
2424                 if (oa->o_valid & OBD_MD_FLATIME) {
2425                         attr->cat_atime = oa->o_atime;
2426                         valid |= CAT_ATIME;
2427                 }
2428                 if (oa->o_valid & OBD_MD_FLCTIME) {
2429                         attr->cat_ctime = oa->o_ctime;
2430                         valid |= CAT_CTIME;
2431                 }
2432
2433                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2434                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2435                         loff_t last_off = last->oap_count + last->oap_obj_off +
2436                                 last->oap_page_off;
2437
2438                         /* Change file size if this is an out of quota or
2439                          * direct IO write and it extends the file size */
2440                         if (loi->loi_lvb.lvb_size < last_off) {
2441                                 attr->cat_size = last_off;
2442                                 valid |= CAT_SIZE;
2443                         }
2444                         /* Extend KMS if it's not a lockless write */
2445                         if (loi->loi_kms < last_off &&
2446                             oap2osc_page(last)->ops_srvlock == 0) {
2447                                 attr->cat_kms = last_off;
2448                                 valid |= CAT_KMS;
2449                         }
2450                 }
2451
2452                 if (valid != 0)
2453                         cl_object_attr_update(env, obj, attr, valid);
2454                 cl_object_attr_unlock(obj);
2455         }
2456         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2457         aa->aa_oa = NULL;
2458
2459         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2460                 osc_inc_unstable_pages(req);
2461
2462         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2463                 list_del_init(&ext->oe_link);
2464                 osc_extent_finish(env, ext, 1,
2465                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2466         }
2467         LASSERT(list_empty(&aa->aa_exts));
2468         LASSERT(list_empty(&aa->aa_oaps));
2469
2470         transferred = (req->rq_bulk == NULL ? /* short io */
2471                        aa->aa_requested_nob :
2472                        req->rq_bulk->bd_nob_transferred);
2473
2474         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2475         ptlrpc_lprocfs_brw(req, transferred);
2476
2477         spin_lock(&cli->cl_loi_list_lock);
2478         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2479          * is called so we know whether to go to sync BRWs or wait for more
2480          * RPCs to complete */
2481         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2482                 cli->cl_w_in_flight--;
2483         else
2484                 cli->cl_r_in_flight--;
2485         osc_wake_cache_waiters(cli);
2486         spin_unlock(&cli->cl_loi_list_lock);
2487
2488         osc_io_unplug(env, cli, NULL);
2489         RETURN(rc);
2490 }
2491
2492 static void brw_commit(struct ptlrpc_request *req)
2493 {
2494         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2495          * this called via the rq_commit_cb, I need to ensure
2496          * osc_dec_unstable_pages is still called. Otherwise unstable
2497          * pages may be leaked. */
2498         spin_lock(&req->rq_lock);
2499         if (likely(req->rq_unstable)) {
2500                 req->rq_unstable = 0;
2501                 spin_unlock(&req->rq_lock);
2502
2503                 osc_dec_unstable_pages(req);
2504         } else {
2505                 req->rq_committed = 1;
2506                 spin_unlock(&req->rq_lock);
2507         }
2508 }
2509
2510 /**
2511  * Build an RPC by the list of extent @ext_list. The caller must ensure
2512  * that the total pages in this list are NOT over max pages per RPC.
2513  * Extents in the list must be in OES_RPC state.
2514  */
2515 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2516                   struct list_head *ext_list, int cmd)
2517 {
2518         struct ptlrpc_request           *req = NULL;
2519         struct osc_extent               *ext;
2520         struct brw_page                 **pga = NULL;
2521         struct osc_brw_async_args       *aa = NULL;
2522         struct obdo                     *oa = NULL;
2523         struct osc_async_page           *oap;
2524         struct osc_object               *obj = NULL;
2525         struct cl_req_attr              *crattr = NULL;
2526         loff_t                          starting_offset = OBD_OBJECT_EOF;
2527         loff_t                          ending_offset = 0;
2528         /* '1' for consistency with code that checks !mpflag to restore */
2529         int mpflag = 1;
2530         int                             mem_tight = 0;
2531         int                             page_count = 0;
2532         bool                            soft_sync = false;
2533         bool                            ndelay = false;
2534         int                             i;
2535         int                             grant = 0;
2536         int                             rc;
2537         __u32                           layout_version = 0;
2538         LIST_HEAD(rpc_list);
2539         struct ost_body                 *body;
2540         ENTRY;
2541         LASSERT(!list_empty(ext_list));
2542
2543         /* add pages into rpc_list to build BRW rpc */
2544         list_for_each_entry(ext, ext_list, oe_link) {
2545                 LASSERT(ext->oe_state == OES_RPC);
2546                 mem_tight |= ext->oe_memalloc;
2547                 grant += ext->oe_grants;
2548                 page_count += ext->oe_nr_pages;
2549                 layout_version = max(layout_version, ext->oe_layout_version);
2550                 if (obj == NULL)
2551                         obj = ext->oe_obj;
2552         }
2553
2554         soft_sync = osc_over_unstable_soft_limit(cli);
2555         if (mem_tight)
2556                 mpflag = memalloc_noreclaim_save();
2557
2558         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2559         if (pga == NULL)
2560                 GOTO(out, rc = -ENOMEM);
2561
2562         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2563         if (oa == NULL)
2564                 GOTO(out, rc = -ENOMEM);
2565
2566         i = 0;
2567         list_for_each_entry(ext, ext_list, oe_link) {
2568                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2569                         if (mem_tight)
2570                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2571                         if (soft_sync)
2572                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2573                         pga[i] = &oap->oap_brw_page;
2574                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2575                         i++;
2576
2577                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2578                         if (starting_offset == OBD_OBJECT_EOF ||
2579                             starting_offset > oap->oap_obj_off)
2580                                 starting_offset = oap->oap_obj_off;
2581                         else
2582                                 LASSERT(oap->oap_page_off == 0);
2583                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2584                                 ending_offset = oap->oap_obj_off +
2585                                                 oap->oap_count;
2586                         else
2587                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2588                                         PAGE_SIZE);
2589                 }
2590                 if (ext->oe_ndelay)
2591                         ndelay = true;
2592         }
2593
2594         /* first page in the list */
2595         oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2596
2597         crattr = &osc_env_info(env)->oti_req_attr;
2598         memset(crattr, 0, sizeof(*crattr));
2599         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2600         crattr->cra_flags = ~0ULL;
2601         crattr->cra_page = oap2cl_page(oap);
2602         crattr->cra_oa = oa;
2603         cl_req_attr_set(env, osc2cl(obj), crattr);
2604
2605         if (cmd == OBD_BRW_WRITE) {
2606                 oa->o_grant_used = grant;
2607                 if (layout_version > 0) {
2608                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2609                                PFID(&oa->o_oi.oi_fid), layout_version);
2610
2611                         oa->o_layout_version = layout_version;
2612                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2613                 }
2614         }
2615
2616         sort_brw_pages(pga, page_count);
2617         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2618         if (rc != 0) {
2619                 CERROR("prep_req failed: %d\n", rc);
2620                 GOTO(out, rc);
2621         }
2622
2623         req->rq_commit_cb = brw_commit;
2624         req->rq_interpret_reply = brw_interpret;
2625         req->rq_memalloc = mem_tight != 0;
2626         oap->oap_request = ptlrpc_request_addref(req);
2627         if (ndelay) {
2628                 req->rq_no_resend = req->rq_no_delay = 1;
2629                 /* probably set a shorter timeout value.
2630                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2631                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2632         }
2633
2634         /* Need to update the timestamps after the request is built in case
2635          * we race with setattr (locally or in queue at OST).  If OST gets
2636          * later setattr before earlier BRW (as determined by the request xid),
2637          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2638          * way to do this in a single call.  bug 10150 */
2639         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2640         crattr->cra_oa = &body->oa;
2641         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2642         cl_req_attr_set(env, osc2cl(obj), crattr);
2643         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2644
2645         aa = ptlrpc_req_async_args(aa, req);
2646         INIT_LIST_HEAD(&aa->aa_oaps);
2647         list_splice_init(&rpc_list, &aa->aa_oaps);
2648         INIT_LIST_HEAD(&aa->aa_exts);
2649         list_splice_init(ext_list, &aa->aa_exts);
2650
2651         spin_lock(&cli->cl_loi_list_lock);
2652         starting_offset >>= PAGE_SHIFT;
2653         if (cmd == OBD_BRW_READ) {
2654                 cli->cl_r_in_flight++;
2655                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2656                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2657                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2658                                       starting_offset + 1);
2659         } else {
2660                 cli->cl_w_in_flight++;
2661                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2662                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2663                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2664                                       starting_offset + 1);
2665         }
2666         spin_unlock(&cli->cl_loi_list_lock);
2667
2668         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2669                   page_count, aa, cli->cl_r_in_flight,
2670                   cli->cl_w_in_flight);
2671         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2672
2673         ptlrpcd_add_req(req);
2674         rc = 0;
2675         EXIT;
2676
2677 out:
2678         if (mem_tight)
2679                 memalloc_noreclaim_restore(mpflag);
2680
2681         if (rc != 0) {
2682                 LASSERT(req == NULL);
2683
2684                 if (oa)
2685                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2686                 if (pga) {
2687                         osc_release_bounce_pages(pga, page_count);
2688                         osc_release_ppga(pga, page_count);
2689                 }
2690                 /* this should happen rarely and is pretty bad, it makes the
2691                  * pending list not follow the dirty order
2692                  */
2693                 while ((ext = list_first_entry_or_null(ext_list,
2694                                                        struct osc_extent,
2695                                                        oe_link)) != NULL) {
2696                         list_del_init(&ext->oe_link);
2697                         osc_extent_finish(env, ext, 0, rc);
2698                 }
2699         }
2700         RETURN(rc);
2701 }
2702
2703 /* This is to refresh our lock in face of no RPCs. */
2704 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2705 {
2706         struct ptlrpc_request *req;
2707         struct obdo oa;
2708         struct brw_page bpg = { .off = start, .count = 1};
2709         struct brw_page *pga = &bpg;
2710         int rc;
2711
2712         memset(&oa, 0, sizeof(oa));
2713         oa.o_oi = osc->oo_oinfo->loi_oi;
2714         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2715         /* For updated servers - don't do a read */
2716         oa.o_flags = OBD_FL_NORPC;
2717
2718         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2719                                   &req, 0);
2720
2721         /* If we succeeded we ship it off, if not there's no point in doing
2722          * anything. Also no resends.
2723          * No interpret callback, no commit callback.
2724          */
2725         if (!rc) {
2726                 req->rq_no_resend = 1;
2727                 ptlrpcd_add_req(req);
2728         }
2729 }
2730
2731 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2732 {
2733         int set = 0;
2734
2735         LASSERT(lock != NULL);
2736
2737         lock_res_and_lock(lock);
2738
2739         if (lock->l_ast_data == NULL)
2740                 lock->l_ast_data = data;
2741         if (lock->l_ast_data == data)
2742                 set = 1;
2743
2744         unlock_res_and_lock(lock);
2745
2746         return set;
2747 }
2748
2749 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2750                      void *cookie, struct lustre_handle *lockh,
2751                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2752                      int errcode)
2753 {
2754         bool intent = *flags & LDLM_FL_HAS_INTENT;
2755         int rc;
2756         ENTRY;
2757
2758         /* The request was created before ldlm_cli_enqueue call. */
2759         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2760                 struct ldlm_reply *rep;
2761
2762                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2763                 LASSERT(rep != NULL);
2764
2765                 rep->lock_policy_res1 =
2766                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2767                 if (rep->lock_policy_res1)
2768                         errcode = rep->lock_policy_res1;
2769                 if (!speculative)
2770                         *flags |= LDLM_FL_LVB_READY;
2771         } else if (errcode == ELDLM_OK) {
2772                 *flags |= LDLM_FL_LVB_READY;
2773         }
2774
2775         /* Call the update callback. */
2776         rc = (*upcall)(cookie, lockh, errcode);
2777
2778         /* release the reference taken in ldlm_cli_enqueue() */
2779         if (errcode == ELDLM_LOCK_MATCHED)
2780                 errcode = ELDLM_OK;
2781         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2782                 ldlm_lock_decref(lockh, mode);
2783
2784         RETURN(rc);
2785 }
2786
2787 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2788                           void *args, int rc)
2789 {
2790         struct osc_enqueue_args *aa = args;
2791         struct ldlm_lock *lock;
2792         struct lustre_handle *lockh = &aa->oa_lockh;
2793         enum ldlm_mode mode = aa->oa_mode;
2794         struct ost_lvb *lvb = aa->oa_lvb;
2795         __u32 lvb_len = sizeof(*lvb);
2796         __u64 flags = 0;
2797         struct ldlm_enqueue_info einfo = {
2798                 .ei_type = aa->oa_type,
2799                 .ei_mode = mode,
2800         };
2801
2802         ENTRY;
2803
2804         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2805          * be valid. */
2806         lock = ldlm_handle2lock(lockh);
2807         LASSERTF(lock != NULL,
2808                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2809                  lockh->cookie, req, aa);
2810
2811         /* Take an additional reference so that a blocking AST that
2812          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2813          * to arrive after an upcall has been executed by
2814          * osc_enqueue_fini(). */
2815         ldlm_lock_addref(lockh, mode);
2816
2817         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2818         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2819
2820         /* Let CP AST to grant the lock first. */
2821         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2822
2823         if (aa->oa_speculative) {
2824                 LASSERT(aa->oa_lvb == NULL);
2825                 LASSERT(aa->oa_flags == NULL);
2826                 aa->oa_flags = &flags;
2827         }
2828
2829         /* Complete obtaining the lock procedure. */
2830         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2831                                    lvb, lvb_len, lockh, rc, false);
2832         /* Complete osc stuff. */
2833         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2834                               aa->oa_flags, aa->oa_speculative, rc);
2835
2836         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2837
2838         ldlm_lock_decref(lockh, mode);
2839         LDLM_LOCK_PUT(lock);
2840         RETURN(rc);
2841 }
2842
2843 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2844  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2845  * other synchronous requests, however keeping some locks and trying to obtain
2846  * others may take a considerable amount of time in a case of ost failure; and
2847  * when other sync requests do not get released lock from a client, the client
2848  * is evicted from the cluster -- such scenarious make the life difficult, so
2849  * release locks just after they are obtained. */
2850 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2851                      __u64 *flags, union ldlm_policy_data *policy,
2852                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2853                      void *cookie, struct ldlm_enqueue_info *einfo,
2854                      struct ptlrpc_request_set *rqset, int async,
2855                      bool speculative)
2856 {
2857         struct obd_device *obd = exp->exp_obd;
2858         struct lustre_handle lockh = { 0 };
2859         struct ptlrpc_request *req = NULL;
2860         int intent = *flags & LDLM_FL_HAS_INTENT;
2861         __u64 match_flags = *flags;
2862         enum ldlm_mode mode;
2863         int rc;
2864         ENTRY;
2865
2866         /* Filesystem lock extents are extended to page boundaries so that
2867          * dealing with the page cache is a little smoother.  */
2868         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2869         policy->l_extent.end |= ~PAGE_MASK;
2870
2871         /* Next, search for already existing extent locks that will cover us */
2872         /* If we're trying to read, we also search for an existing PW lock.  The
2873          * VFS and page cache already protect us locally, so lots of readers/
2874          * writers can share a single PW lock.
2875          *
2876          * There are problems with conversion deadlocks, so instead of
2877          * converting a read lock to a write lock, we'll just enqueue a new
2878          * one.
2879          *
2880          * At some point we should cancel the read lock instead of making them
2881          * send us a blocking callback, but there are problems with canceling
2882          * locks out from other users right now, too. */
2883         mode = einfo->ei_mode;
2884         if (einfo->ei_mode == LCK_PR)
2885                 mode |= LCK_PW;
2886         /* Normal lock requests must wait for the LVB to be ready before
2887          * matching a lock; speculative lock requests do not need to,
2888          * because they will not actually use the lock. */
2889         if (!speculative)
2890                 match_flags |= LDLM_FL_LVB_READY;
2891         if (intent != 0)
2892                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2893         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2894                                einfo->ei_type, policy, mode, &lockh);
2895         if (mode) {
2896                 struct ldlm_lock *matched;
2897
2898                 if (*flags & LDLM_FL_TEST_LOCK)
2899                         RETURN(ELDLM_OK);
2900
2901                 matched = ldlm_handle2lock(&lockh);
2902                 if (speculative) {
2903                         /* This DLM lock request is speculative, and does not
2904                          * have an associated IO request. Therefore if there
2905                          * is already a DLM lock, it wll just inform the
2906                          * caller to cancel the request for this stripe.*/
2907                         lock_res_and_lock(matched);
2908                         if (ldlm_extent_equal(&policy->l_extent,
2909                             &matched->l_policy_data.l_extent))
2910                                 rc = -EEXIST;
2911                         else
2912                                 rc = -ECANCELED;
2913                         unlock_res_and_lock(matched);
2914
2915                         ldlm_lock_decref(&lockh, mode);
2916                         LDLM_LOCK_PUT(matched);
2917                         RETURN(rc);
2918                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2919                         *flags |= LDLM_FL_LVB_READY;
2920
2921                         /* We already have a lock, and it's referenced. */
2922                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2923
2924                         ldlm_lock_decref(&lockh, mode);
2925                         LDLM_LOCK_PUT(matched);
2926                         RETURN(ELDLM_OK);
2927                 } else {
2928                         ldlm_lock_decref(&lockh, mode);
2929                         LDLM_LOCK_PUT(matched);
2930                 }
2931         }
2932
2933         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2934                 RETURN(-ENOLCK);
2935
2936         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2937         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2938
2939         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2940                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2941         if (async) {
2942                 if (!rc) {
2943                         struct osc_enqueue_args *aa;
2944                         aa = ptlrpc_req_async_args(aa, req);
2945                         aa->oa_exp         = exp;
2946                         aa->oa_mode        = einfo->ei_mode;
2947                         aa->oa_type        = einfo->ei_type;
2948                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2949                         aa->oa_upcall      = upcall;
2950                         aa->oa_cookie      = cookie;
2951                         aa->oa_speculative = speculative;
2952                         if (!speculative) {
2953                                 aa->oa_flags  = flags;
2954                                 aa->oa_lvb    = lvb;
2955                         } else {
2956                                 /* speculative locks are essentially to enqueue
2957                                  * a DLM lock  in advance, so we don't care
2958                                  * about the result of the enqueue. */
2959                                 aa->oa_lvb    = NULL;
2960                                 aa->oa_flags  = NULL;
2961                         }
2962
2963                         req->rq_interpret_reply = osc_enqueue_interpret;
2964                         ptlrpc_set_add_req(rqset, req);
2965                 }
2966                 RETURN(rc);
2967         }
2968
2969         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2970                               flags, speculative, rc);
2971
2972         RETURN(rc);
2973 }
2974
2975 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2976                    struct ldlm_res_id *res_id, enum ldlm_type type,
2977                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2978                    __u64 *flags, struct osc_object *obj,
2979                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2980 {
2981         struct obd_device *obd = exp->exp_obd;
2982         __u64 lflags = *flags;
2983         enum ldlm_mode rc;
2984         ENTRY;
2985
2986         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2987                 RETURN(-EIO);
2988
2989         /* Filesystem lock extents are extended to page boundaries so that
2990          * dealing with the page cache is a little smoother */
2991         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2992         policy->l_extent.end |= ~PAGE_MASK;
2993
2994         /* Next, search for already existing extent locks that will cover us */
2995         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2996                                         res_id, type, policy, mode, lockh,
2997                                         match_flags);
2998         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2999                 RETURN(rc);
3000
3001         if (obj != NULL) {
3002                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3003
3004                 LASSERT(lock != NULL);
3005                 if (osc_set_lock_data(lock, obj)) {
3006                         lock_res_and_lock(lock);
3007                         if (!ldlm_is_lvb_cached(lock)) {
3008                                 LASSERT(lock->l_ast_data == obj);
3009                                 osc_lock_lvb_update(env, obj, lock, NULL);
3010                                 ldlm_set_lvb_cached(lock);
3011                         }
3012                         unlock_res_and_lock(lock);
3013                 } else {
3014                         ldlm_lock_decref(lockh, rc);
3015                         rc = 0;
3016                 }
3017                 LDLM_LOCK_PUT(lock);
3018         }
3019         RETURN(rc);
3020 }
3021
3022 static int osc_statfs_interpret(const struct lu_env *env,
3023                                 struct ptlrpc_request *req, void *args, int rc)
3024 {
3025         struct osc_async_args *aa = args;
3026         struct obd_statfs *msfs;
3027
3028         ENTRY;
3029         if (rc == -EBADR)
3030                 /*
3031                  * The request has in fact never been sent due to issues at
3032                  * a higher level (LOV).  Exit immediately since the caller
3033                  * is aware of the problem and takes care of the clean up.
3034                  */
3035                 RETURN(rc);
3036
3037         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3038             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3039                 GOTO(out, rc = 0);
3040
3041         if (rc != 0)
3042                 GOTO(out, rc);
3043
3044         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3045         if (msfs == NULL)
3046                 GOTO(out, rc = -EPROTO);
3047
3048         *aa->aa_oi->oi_osfs = *msfs;
3049 out:
3050         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3051
3052         RETURN(rc);
3053 }
3054
3055 static int osc_statfs_async(struct obd_export *exp,
3056                             struct obd_info *oinfo, time64_t max_age,
3057                             struct ptlrpc_request_set *rqset)
3058 {
3059         struct obd_device     *obd = class_exp2obd(exp);
3060         struct ptlrpc_request *req;
3061         struct osc_async_args *aa;
3062         int rc;
3063         ENTRY;
3064
3065         if (obd->obd_osfs_age >= max_age) {
3066                 CDEBUG(D_SUPER,
3067                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3068                        obd->obd_name, &obd->obd_osfs,
3069                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3070                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3071                 spin_lock(&obd->obd_osfs_lock);
3072                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3073                 spin_unlock(&obd->obd_osfs_lock);
3074                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3075                 if (oinfo->oi_cb_up)
3076                         oinfo->oi_cb_up(oinfo, 0);
3077
3078                 RETURN(0);
3079         }
3080
3081         /* We could possibly pass max_age in the request (as an absolute
3082          * timestamp or a "seconds.usec ago") so the target can avoid doing
3083          * extra calls into the filesystem if that isn't necessary (e.g.
3084          * during mount that would help a bit).  Having relative timestamps
3085          * is not so great if request processing is slow, while absolute
3086          * timestamps are not ideal because they need time synchronization. */
3087         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3088         if (req == NULL)
3089                 RETURN(-ENOMEM);
3090
3091         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3092         if (rc) {
3093                 ptlrpc_request_free(req);
3094                 RETURN(rc);
3095         }
3096         ptlrpc_request_set_replen(req);
3097         req->rq_request_portal = OST_CREATE_PORTAL;
3098         ptlrpc_at_set_req_timeout(req);
3099
3100         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3101                 /* procfs requests not want stat in wait for avoid deadlock */
3102                 req->rq_no_resend = 1;
3103                 req->rq_no_delay = 1;
3104         }
3105
3106         req->rq_interpret_reply = osc_statfs_interpret;
3107    &nb