Whamcloud - gitweb
e1ebbf504f68770003626f06c16f598a94a440c2
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_ioctl_old.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <lustre_osc.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(osc_fallocate_base);
483
484 static int osc_sync_interpret(const struct lu_env *env,
485                               struct ptlrpc_request *req, void *args, int rc)
486 {
487         struct osc_fsync_args *fa = args;
488         struct ost_body *body;
489         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490         unsigned long valid = 0;
491         struct cl_object *obj;
492         ENTRY;
493
494         if (rc != 0)
495                 GOTO(out, rc);
496
497         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498         if (body == NULL) {
499                 CERROR("can't unpack ost_body\n");
500                 GOTO(out, rc = -EPROTO);
501         }
502
503         *fa->fa_oa = body->oa;
504         obj = osc2cl(fa->fa_obj);
505
506         /* Update osc object's blocks attribute */
507         cl_object_attr_lock(obj);
508         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509                 attr->cat_blocks = body->oa.o_blocks;
510                 valid |= CAT_BLOCKS;
511         }
512
513         if (valid != 0)
514                 cl_object_attr_update(env, obj, attr, valid);
515         cl_object_attr_unlock(obj);
516
517 out:
518         rc = fa->fa_upcall(fa->fa_cookie, rc);
519         RETURN(rc);
520 }
521
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523                   obd_enqueue_update_f upcall, void *cookie,
524                   struct ptlrpc_request_set *rqset)
525 {
526         struct obd_export     *exp = osc_export(obj);
527         struct ptlrpc_request *req;
528         struct ost_body       *body;
529         struct osc_fsync_args *fa;
530         int                    rc;
531         ENTRY;
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549         req->rq_interpret_reply = osc_sync_interpret;
550
551         fa = ptlrpc_req_async_args(fa, req);
552         fa->fa_obj = obj;
553         fa->fa_oa = oa;
554         fa->fa_upcall = upcall;
555         fa->fa_cookie = cookie;
556
557         ptlrpc_set_add_req(rqset, req);
558
559         RETURN (0);
560 }
561
562 /* Find and cancel locally locks matched by @mode in the resource found by
563  * @objid. Found locks are added into @cancel list. Returns the amount of
564  * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566                                    struct list_head *cancels,
567                                    enum ldlm_mode mode, __u64 lock_flags)
568 {
569         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570         struct ldlm_res_id res_id;
571         struct ldlm_resource *res;
572         int count;
573         ENTRY;
574
575         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576          * export) but disabled through procfs (flag in NS).
577          *
578          * This distinguishes from a case when ELC is not supported originally,
579          * when we still want to cancel locks in advance and just cancel them
580          * locally, without sending any RPC. */
581         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
582                 RETURN(0);
583
584         ostid_build_res_name(&oa->o_oi, &res_id);
585         res = ldlm_resource_get(ns, &res_id, 0, 0);
586         if (IS_ERR(res))
587                 RETURN(0);
588
589         LDLM_RESOURCE_ADDREF(res);
590         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591                                            lock_flags, 0, NULL);
592         LDLM_RESOURCE_DELREF(res);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(const struct lu_env *env,
598                                  struct ptlrpc_request *req, void *args, int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         wake_up(&cli->cl_destroy_waitq);
604
605         return 0;
606 }
607
608 static int osc_can_send_destroy(struct client_obd *cli)
609 {
610         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611             cli->cl_max_rpcs_in_flight) {
612                 /* The destroy request can be sent */
613                 return 1;
614         }
615         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616             cli->cl_max_rpcs_in_flight) {
617                 /*
618                  * The counter has been modified between the two atomic
619                  * operations.
620                  */
621                 wake_up(&cli->cl_destroy_waitq);
622         }
623         return 0;
624 }
625
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
627                        struct obdo *oa)
628 {
629         struct client_obd     *cli = &exp->exp_obd->u.cli;
630         struct ptlrpc_request *req;
631         struct ost_body       *body;
632         LIST_HEAD(cancels);
633         int rc, count;
634         ENTRY;
635
636         if (!oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642                                         LDLM_FL_DISCARD_DATA);
643
644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645         if (req == NULL) {
646                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
647                 RETURN(-ENOMEM);
648         }
649
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661         LASSERT(body);
662         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663
664         ptlrpc_request_set_replen(req);
665
666         req->rq_interpret_reply = osc_destroy_interpret;
667         if (!osc_can_send_destroy(cli)) {
668                 /*
669                  * Wait until the number of on-going destroy RPCs drops
670                  * under max_rpc_in_flight
671                  */
672                 rc = l_wait_event_abortable_exclusive(
673                         cli->cl_destroy_waitq,
674                         osc_can_send_destroy(cli));
675                 if (rc) {
676                         ptlrpc_req_finished(req);
677                         RETURN(-EINTR);
678                 }
679         }
680
681         /* Do not wait for response */
682         ptlrpcd_add_req(req);
683         RETURN(0);
684 }
685
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
687                                 long writing_bytes)
688 {
689         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690
691         LASSERT(!(oa->o_valid & bits));
692
693         oa->o_valid |= bits;
694         spin_lock(&cli->cl_loi_list_lock);
695         if (cli->cl_ocd_grant_param)
696                 oa->o_dirty = cli->cl_dirty_grant;
697         else
698                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700                 CERROR("dirty %lu > dirty_max %lu\n",
701                        cli->cl_dirty_pages,
702                        cli->cl_dirty_max_pages);
703                 oa->o_undirty = 0;
704         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705                             (long)(obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
711                        obd_max_dirty_pages);
712                 oa->o_undirty = 0;
713         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714                             0x7fffffff)) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
717                 oa->o_undirty = 0;
718         } else {
719                 unsigned long nrpages;
720                 unsigned long undirty;
721
722                 nrpages = cli->cl_max_pages_per_rpc;
723                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725                 undirty = nrpages << PAGE_SHIFT;
726                 if (cli->cl_ocd_grant_param) {
727                         int nrextents;
728
729                         /* take extent tax into account when asking for more
730                          * grant space */
731                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732                                      cli->cl_max_extent_pages;
733                         undirty += nrextents * cli->cl_grant_extent_tax;
734                 }
735                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736                  * to add extent tax, etc.
737                  */
738                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740         }
741         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743         if (cli->cl_lost_grant > INT_MAX) {
744                 CDEBUG(D_CACHE,
745                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746                       cli_name(cli), cli->cl_lost_grant);
747                 oa->o_dropped = INT_MAX;
748         } else {
749                 oa->o_dropped = cli->cl_lost_grant;
750         }
751         cli->cl_lost_grant -= oa->o_dropped;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
756 }
757
758 void osc_update_next_shrink(struct client_obd *cli)
759 {
760         cli->cl_next_shrink_grant = ktime_get_seconds() +
761                                     cli->cl_grant_shrink_interval;
762
763         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764                cli->cl_next_shrink_grant);
765 }
766 EXPORT_SYMBOL(osc_update_next_shrink);
767
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
769 {
770         spin_lock(&cli->cl_loi_list_lock);
771         cli->cl_avail_grant += grant;
772         spin_unlock(&cli->cl_loi_list_lock);
773 }
774
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
776 {
777         if (body->oa.o_valid & OBD_MD_FLGRANT) {
778                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779                 __osc_update_grant(cli, body->oa.o_grant);
780         }
781 }
782
783 /**
784  * grant thread data for shrinking space.
785  */
786 struct grant_thread_data {
787         struct list_head        gtd_clients;
788         struct mutex            gtd_mutex;
789         unsigned long           gtd_stopped:1;
790 };
791 static struct grant_thread_data client_gtd;
792
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794                                       struct ptlrpc_request *req,
795                                       void *args, int rc)
796 {
797         struct osc_grant_args *aa = args;
798         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799         struct ost_body *body;
800
801         if (rc != 0) {
802                 __osc_update_grant(cli, aa->aa_oa->o_grant);
803                 GOTO(out, rc);
804         }
805
806         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
807         LASSERT(body);
808         osc_update_grant(cli, body);
809 out:
810         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
811         aa->aa_oa = NULL;
812
813         return rc;
814 }
815
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
817 {
818         spin_lock(&cli->cl_loi_list_lock);
819         oa->o_grant = cli->cl_avail_grant / 4;
820         cli->cl_avail_grant -= oa->o_grant;
821         spin_unlock(&cli->cl_loi_list_lock);
822         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823                 oa->o_valid |= OBD_MD_FLFLAGS;
824                 oa->o_flags = 0;
825         }
826         oa->o_flags |= OBD_FL_SHRINK_GRANT;
827         osc_update_next_shrink(cli);
828 }
829
830 /* Shrink the current grant, either from some large amount to enough for a
831  * full set of in-flight RPCs, or if we have already shrunk to that limit
832  * then to enough for a single RPC.  This avoids keeping more grant than
833  * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
835 {
836         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
838
839         spin_lock(&cli->cl_loi_list_lock);
840         if (cli->cl_avail_grant <= target_bytes)
841                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842         spin_unlock(&cli->cl_loi_list_lock);
843
844         return osc_shrink_grant_to_target(cli, target_bytes);
845 }
846
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
848 {
849         int                     rc = 0;
850         struct ost_body        *body;
851         ENTRY;
852
853         spin_lock(&cli->cl_loi_list_lock);
854         /* Don't shrink if we are already above or below the desired limit
855          * We don't want to shrink below a single RPC, as that will negatively
856          * impact block allocation and long-term performance. */
857         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860         if (target_bytes >= cli->cl_avail_grant) {
861                 spin_unlock(&cli->cl_loi_list_lock);
862                 RETURN(0);
863         }
864         spin_unlock(&cli->cl_loi_list_lock);
865
866         OBD_ALLOC_PTR(body);
867         if (!body)
868                 RETURN(-ENOMEM);
869
870         osc_announce_cached(cli, &body->oa, 0);
871
872         spin_lock(&cli->cl_loi_list_lock);
873         if (target_bytes >= cli->cl_avail_grant) {
874                 /* available grant has changed since target calculation */
875                 spin_unlock(&cli->cl_loi_list_lock);
876                 GOTO(out_free, rc = 0);
877         }
878         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879         cli->cl_avail_grant = target_bytes;
880         spin_unlock(&cli->cl_loi_list_lock);
881         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882                 body->oa.o_valid |= OBD_MD_FLFLAGS;
883                 body->oa.o_flags = 0;
884         }
885         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887
888         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890                                 sizeof(*body), body, NULL);
891         if (rc != 0)
892                 __osc_update_grant(cli, body->oa.o_grant);
893 out_free:
894         OBD_FREE_PTR(body);
895         RETURN(rc);
896 }
897
898 static int osc_should_shrink_grant(struct client_obd *client)
899 {
900         time64_t next_shrink = client->cl_next_shrink_grant;
901
902         if (client->cl_import == NULL)
903                 return 0;
904
905         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906             client->cl_import->imp_grant_shrink_disabled) {
907                 osc_update_next_shrink(client);
908                 return 0;
909         }
910
911         if (ktime_get_seconds() >= next_shrink - 5) {
912                 /* Get the current RPC size directly, instead of going via:
913                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914                  * Keep comment here so that it can be found by searching. */
915                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
916
917                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918                     client->cl_avail_grant > brw_size)
919                         return 1;
920                 else
921                         osc_update_next_shrink(client);
922         }
923         return 0;
924 }
925
926 #define GRANT_SHRINK_RPC_BATCH  100
927
928 static struct delayed_work work;
929
930 static void osc_grant_work_handler(struct work_struct *data)
931 {
932         struct client_obd *cli;
933         int rpc_sent;
934         bool init_next_shrink = true;
935         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
936
937         rpc_sent = 0;
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_for_each_entry(cli, &client_gtd.gtd_clients,
940                             cl_grant_chain) {
941                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942                     osc_should_shrink_grant(cli)) {
943                         osc_shrink_grant(cli);
944                         rpc_sent++;
945                 }
946
947                 if (!init_next_shrink) {
948                         if (cli->cl_next_shrink_grant < next_shrink &&
949                             cli->cl_next_shrink_grant > ktime_get_seconds())
950                                 next_shrink = cli->cl_next_shrink_grant;
951                 } else {
952                         init_next_shrink = false;
953                         next_shrink = cli->cl_next_shrink_grant;
954                 }
955         }
956         mutex_unlock(&client_gtd.gtd_mutex);
957
958         if (client_gtd.gtd_stopped == 1)
959                 return;
960
961         if (next_shrink > ktime_get_seconds()) {
962                 time64_t delay = next_shrink - ktime_get_seconds();
963
964                 schedule_delayed_work(&work, cfs_time_seconds(delay));
965         } else {
966                 schedule_work(&work.work);
967         }
968 }
969
970 void osc_schedule_grant_work(void)
971 {
972         cancel_delayed_work_sync(&work);
973         schedule_work(&work.work);
974 }
975 EXPORT_SYMBOL(osc_schedule_grant_work);
976
977 /**
978  * Start grant thread for returing grant to server for idle clients.
979  */
980 static int osc_start_grant_work(void)
981 {
982         client_gtd.gtd_stopped = 0;
983         mutex_init(&client_gtd.gtd_mutex);
984         INIT_LIST_HEAD(&client_gtd.gtd_clients);
985
986         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987         schedule_work(&work.work);
988
989         return 0;
990 }
991
992 static void osc_stop_grant_work(void)
993 {
994         client_gtd.gtd_stopped = 1;
995         cancel_delayed_work_sync(&work);
996 }
997
998 static void osc_add_grant_list(struct client_obd *client)
999 {
1000         mutex_lock(&client_gtd.gtd_mutex);
1001         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002         mutex_unlock(&client_gtd.gtd_mutex);
1003 }
1004
1005 static void osc_del_grant_list(struct client_obd *client)
1006 {
1007         if (list_empty(&client->cl_grant_chain))
1008                 return;
1009
1010         mutex_lock(&client_gtd.gtd_mutex);
1011         list_del_init(&client->cl_grant_chain);
1012         mutex_unlock(&client_gtd.gtd_mutex);
1013 }
1014
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1016 {
1017         /*
1018          * ocd_grant is the total grant amount we're expect to hold: if we've
1019          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1021          * dirty.
1022          *
1023          * race is tolerable here: if we're evicted, but imp_state already
1024          * left EVICTED state, then cl_dirty_pages must be 0 already.
1025          */
1026         spin_lock(&cli->cl_loi_list_lock);
1027         cli->cl_avail_grant = ocd->ocd_grant;
1028         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029                 unsigned long consumed = cli->cl_reserved_grant;
1030
1031                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032                         consumed += cli->cl_dirty_grant;
1033                 else
1034                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035                 if (cli->cl_avail_grant < consumed) {
1036                         CERROR("%s: granted %ld but already consumed %ld\n",
1037                                cli_name(cli), cli->cl_avail_grant, consumed);
1038                         cli->cl_avail_grant = 0;
1039                 } else {
1040                         cli->cl_avail_grant -= consumed;
1041                 }
1042         }
1043
1044         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1045                 u64 size;
1046                 int chunk_mask;
1047
1048                 /* overhead for each extent insertion */
1049                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050                 /* determine the appropriate chunk size used by osc_extent. */
1051                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052                                           ocd->ocd_grant_blkbits);
1053                 /* max_pages_per_rpc must be chunk aligned */
1054                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056                                              ~chunk_mask) & chunk_mask;
1057                 /* determine maximum extent size, in #pages */
1058                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060                 cli->cl_ocd_grant_param = 1;
1061         } else {
1062                 cli->cl_ocd_grant_param = 0;
1063                 cli->cl_grant_extent_tax = 0;
1064                 cli->cl_chunkbits = PAGE_SHIFT;
1065                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1066         }
1067         spin_unlock(&cli->cl_loi_list_lock);
1068
1069         CDEBUG(D_CACHE,
1070                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1071                cli_name(cli),
1072                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073                cli->cl_max_extent_pages);
1074
1075         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076                 osc_add_grant_list(cli);
1077 }
1078 EXPORT_SYMBOL(osc_init_grant);
1079
1080 /* We assume that the reason this OSC got a short read is because it read
1081  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083  * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085                               struct brw_page **pga)
1086 {
1087         char *ptr;
1088         int i = 0;
1089
1090         /* skip bytes read OK */
1091         while (nob_read > 0) {
1092                 LASSERT (page_count > 0);
1093
1094                 if (pga[i]->bp_count > nob_read) {
1095                         /* EOF inside this page */
1096                         ptr = kmap(pga[i]->bp_page) +
1097                                 (pga[i]->bp_off & ~PAGE_MASK);
1098                         memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read);
1099                         kunmap(pga[i]->bp_page);
1100                         page_count--;
1101                         i++;
1102                         break;
1103                 }
1104
1105                 nob_read -= pga[i]->bp_count;
1106                 page_count--;
1107                 i++;
1108         }
1109
1110         /* zero remaining pages */
1111         while (page_count-- > 0) {
1112                 ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK);
1113                 memset(ptr, 0, pga[i]->bp_count);
1114                 kunmap(pga[i]->bp_page);
1115                 i++;
1116         }
1117 }
1118
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120                            int requested_nob, int niocount,
1121                            size_t page_count, struct brw_page **pga)
1122 {
1123         int     i;
1124         __u32   *remote_rcs;
1125
1126         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127                                                   sizeof(*remote_rcs) *
1128                                                   niocount);
1129         if (remote_rcs == NULL) {
1130                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1131                 return(-EPROTO);
1132         }
1133
1134         /* return error if any niobuf was in error */
1135         for (i = 0; i < niocount; i++) {
1136                 if ((int)remote_rcs[i] < 0) {
1137                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138                                i, remote_rcs[i], req);
1139                         return remote_rcs[i];
1140                 }
1141
1142                 if (remote_rcs[i] != 0) {
1143                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144                                 i, remote_rcs[i], req);
1145                         return(-EPROTO);
1146                 }
1147         }
1148         if (req->rq_bulk != NULL &&
1149             req->rq_bulk->bd_nob_transferred != requested_nob) {
1150                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151                        req->rq_bulk->bd_nob_transferred, requested_nob);
1152                 return(-EPROTO);
1153         }
1154
1155         return (0);
1156 }
1157
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1159 {
1160         if (p1->bp_flag != p2->bp_flag) {
1161                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1163                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC |
1164                                   OBD_BRW_SYS_RESOURCE);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at https://jira.whamcloud.com/\n",
1171                               p1->bp_flag, p2->bp_flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->bp_off + p1->bp_count == p2->bp_off);
1177 }
1178
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181                                    size_t pg_count, struct brw_page **pga,
1182                                    int opc, obd_dif_csum_fn *fn,
1183                                    int sector_size,
1184                                    u32 *check_sum, bool resend)
1185 {
1186         struct ahash_request *req;
1187         /* Used Adler as the default checksum type on top of DIF tags */
1188         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189         struct page *__page;
1190         unsigned char *buffer;
1191         __be16 *guard_start;
1192         int guard_number;
1193         int used_number = 0;
1194         int used;
1195         u32 cksum;
1196         unsigned int bufsize = sizeof(cksum);
1197         int rc = 0, rc2;
1198         int i = 0;
1199
1200         LASSERT(pg_count > 0);
1201
1202         __page = alloc_page(GFP_KERNEL);
1203         if (__page == NULL)
1204                 return -ENOMEM;
1205
1206         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207         if (IS_ERR(req)) {
1208                 rc = PTR_ERR(req);
1209                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1211                 GOTO(out, rc);
1212         }
1213
1214         buffer = kmap(__page);
1215         guard_start = (__be16 *)buffer;
1216         guard_number = PAGE_SIZE / sizeof(*guard_start);
1217         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218                "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219                guard_number, resend, nob, pg_count);
1220
1221         while (nob > 0 && pg_count > 0) {
1222                 int off = pga[i]->bp_off & ~PAGE_MASK;
1223                 unsigned int count =
1224                         pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1225                 int guards_needed = DIV_ROUND_UP(off + count, sector_size) -
1226                                         (off / sector_size);
1227
1228                 if (guards_needed > guard_number - used_number) {
1229                         cfs_crypto_hash_update_page(req, __page, 0,
1230                                 used_number * sizeof(*guard_start));
1231                         used_number = 0;
1232                 }
1233
1234                 /* corrupt the data before we compute the checksum, to
1235                  * simulate an OST->client data error */
1236                 if (unlikely(i == 0 && opc == OST_READ &&
1237                              CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1238                         unsigned char *ptr = kmap(pga[i]->bp_page);
1239
1240                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1241                         kunmap(pga[i]->bp_page);
1242                 }
1243
1244                 /*
1245                  * The left guard number should be able to hold checksums of a
1246                  * whole page
1247                  */
1248                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page,
1249                                                   pga[i]->bp_off & ~PAGE_MASK,
1250                                                   count,
1251                                                   guard_start + used_number,
1252                                                   guard_number - used_number,
1253                                                   &used, sector_size,
1254                                                   fn);
1255                 if (unlikely(resend))
1256                         CDEBUG(D_PAGE | D_HA,
1257                                "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1258                                i, used, pga[i]->bp_off & ~PAGE_MASK, count,
1259                                (int)(used * sizeof(*guard_start)),
1260                                guard_start + used_number);
1261                 if (rc)
1262                         break;
1263
1264                 used_number += used;
1265                 nob -= pga[i]->bp_count;
1266                 pg_count--;
1267                 i++;
1268         }
1269         kunmap(__page);
1270         if (rc)
1271                 GOTO(out_hash, rc);
1272
1273         if (used_number != 0)
1274                 cfs_crypto_hash_update_page(req, __page, 0,
1275                         used_number * sizeof(*guard_start));
1276
1277 out_hash:
1278         rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1279         if (!rc)
1280                 rc = rc2;
1281         if (rc == 0) {
1282                 /* For sending we only compute the wrong checksum instead
1283                  * of corrupting the data so it is still correct on a redo */
1284                 if (opc == OST_WRITE &&
1285                                 CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1286                         cksum++;
1287
1288                 *check_sum = cksum;
1289         }
1290 out:
1291         __free_page(__page);
1292         return rc;
1293 }
1294 #else /* !CONFIG_CRC_T10DIF */
1295 #define obd_dif_ip_fn NULL
1296 #define obd_dif_crc_fn NULL
1297 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1298         -EOPNOTSUPP
1299 #endif /* CONFIG_CRC_T10DIF */
1300
1301 static int osc_checksum_bulk(int nob, size_t pg_count,
1302                              struct brw_page **pga, int opc,
1303                              enum cksum_types cksum_type,
1304                              u32 *cksum)
1305 {
1306         int                             i = 0;
1307         struct ahash_request           *req;
1308         unsigned int                    bufsize;
1309         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1310
1311         LASSERT(pg_count > 0);
1312
1313         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1314         if (IS_ERR(req)) {
1315                 CERROR("Unable to initialize checksum hash %s\n",
1316                        cfs_crypto_hash_name(cfs_alg));
1317                 return PTR_ERR(req);
1318         }
1319
1320         while (nob > 0 && pg_count > 0) {
1321                 unsigned int count =
1322                         pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1323
1324                 /* corrupt the data before we compute the checksum, to
1325                  * simulate an OST->client data error */
1326                 if (i == 0 && opc == OST_READ &&
1327                     CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1328                         unsigned char *ptr = kmap(pga[i]->bp_page);
1329                         int off = pga[i]->bp_off & ~PAGE_MASK;
1330
1331                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1332                         kunmap(pga[i]->bp_page);
1333                 }
1334                 cfs_crypto_hash_update_page(req, pga[i]->bp_page,
1335                                             pga[i]->bp_off & ~PAGE_MASK,
1336                                             count);
1337                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n",
1338                                (int)(pga[i]->bp_off & ~PAGE_MASK));
1339
1340                 nob -= pga[i]->bp_count;
1341                 pg_count--;
1342                 i++;
1343         }
1344
1345         bufsize = sizeof(*cksum);
1346         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1347
1348         /* For sending we only compute the wrong checksum instead
1349          * of corrupting the data so it is still correct on a redo */
1350         if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1351                 (*cksum)++;
1352
1353         return 0;
1354 }
1355
1356 static int osc_checksum_bulk_rw(const char *obd_name,
1357                                 enum cksum_types cksum_type,
1358                                 int nob, size_t pg_count,
1359                                 struct brw_page **pga, int opc,
1360                                 u32 *check_sum, bool resend)
1361 {
1362         obd_dif_csum_fn *fn = NULL;
1363         int sector_size = 0;
1364         int rc;
1365
1366         ENTRY;
1367         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1368
1369         if (fn)
1370                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1371                                              opc, fn, sector_size, check_sum,
1372                                              resend);
1373         else
1374                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1375                                        check_sum);
1376
1377         RETURN(rc);
1378 }
1379
1380 #ifdef CONFIG_LL_ENCRYPTION
1381 /**
1382  * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
1383  * @srcpage:      The locked pagecache page containing the block(s) to encrypt
1384  * @dstpage:      The page to put encryption result
1385  * @len:       Total size of the block(s) to encrypt.  Must be a nonzero
1386  *              multiple of the filesystem's block size.
1387  * @offs:      Byte offset within @page of the first block to encrypt.  Must be
1388  *              a multiple of the filesystem's block size.
1389  * @gfp_flags: Memory allocation flags
1390  *
1391  * This overlay function is necessary to be able to provide our own bounce page.
1392  */
1393 static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
1394                                                  struct page *dstpage,
1395                                                  unsigned int len,
1396                                                  unsigned int offs,
1397                                                  gfp_t gfp_flags)
1398
1399 {
1400         const struct inode *inode = srcpage->mapping->host;
1401         const unsigned int blockbits = inode->i_blkbits;
1402         const unsigned int blocksize = 1 << blockbits;
1403         u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
1404                 (offs >> blockbits);
1405         unsigned int i;
1406         int err;
1407
1408         if (unlikely(!dstpage))
1409                 return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
1410                                                         gfp_flags);
1411
1412         if (WARN_ON_ONCE(!PageLocked(srcpage)))
1413                 return ERR_PTR(-EINVAL);
1414
1415         if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
1416                 return ERR_PTR(-EINVAL);
1417
1418         /* Set PagePrivate2 for disambiguation in
1419          * osc_finalize_bounce_page().
1420          * It means cipher page was not allocated by llcrypt.
1421          */
1422         SetPagePrivate2(dstpage);
1423
1424         for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
1425                 err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
1426                                             i, lblk_num, gfp_flags);
1427                 if (err)
1428                         return ERR_PTR(err);
1429         }
1430         SetPagePrivate(dstpage);
1431         set_page_private(dstpage, (unsigned long)srcpage);
1432         return dstpage;
1433 }
1434
1435 /**
1436  * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
1437  *
1438  * This overlay function is necessary to handle bounce pages
1439  * allocated by ourselves.
1440  */
1441 static inline void osc_finalize_bounce_page(struct page **pagep)
1442 {
1443         struct page *page = *pagep;
1444
1445         ClearPageChecked(page);
1446         /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
1447          * to indicate the cipher page was allocated by ourselves.
1448          * So we must not free it via llcrypt.
1449          */
1450         if (unlikely(!page || !PagePrivate2(page)))
1451                 return llcrypt_finalize_bounce_page(pagep);
1452
1453         if (llcrypt_is_bounce_page(page)) {
1454                 *pagep = llcrypt_pagecache_page(page);
1455                 ClearPagePrivate2(page);
1456                 set_page_private(page, (unsigned long)NULL);
1457                 ClearPagePrivate(page);
1458         }
1459 }
1460 #else /* !CONFIG_LL_ENCRYPTION */
1461 #define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
1462         llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
1463 #define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
1464 #endif
1465
1466 static inline void osc_release_bounce_pages(struct brw_page **pga,
1467                                             u32 page_count)
1468 {
1469 #ifdef HAVE_LUSTRE_CRYPTO
1470         struct page **pa = NULL;
1471         int i, j = 0;
1472
1473         if (!pga[0])
1474                 return;
1475
1476 #ifdef CONFIG_LL_ENCRYPTION
1477         if (PageChecked(pga[0]->bp_page)) {
1478                 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1479                 if (!pa)
1480                         return;
1481         }
1482 #endif
1483
1484         for (i = 0; i < page_count; i++) {
1485                 /* Bounce pages used by osc_encrypt_pagecache_blocks()
1486                  * called from osc_brw_prep_request()
1487                  * are identified thanks to the PageChecked flag.
1488                  */
1489                 if (PageChecked(pga[i]->bp_page)) {
1490                         if (pa)
1491                                 pa[j++] = pga[i]->bp_page;
1492                         osc_finalize_bounce_page(&pga[i]->bp_page);
1493                 }
1494                 pga[i]->bp_count -= pga[i]->bp_count_diff;
1495                 pga[i]->bp_off += pga[i]->bp_off_diff;
1496         }
1497
1498         if (pa) {
1499                 obd_pool_put_pages_array(pa, j);
1500                 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1501         }
1502 #endif
1503 }
1504
1505 static int
1506 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1507                      u32 page_count, struct brw_page **pga,
1508                      struct ptlrpc_request **reqp, int resend)
1509 {
1510         struct ptlrpc_request *req;
1511         struct ptlrpc_bulk_desc *desc;
1512         struct ost_body *body;
1513         struct obd_ioobj *ioobj;
1514         struct niobuf_remote *niobuf;
1515         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1516         struct osc_brw_async_args *aa;
1517         struct req_capsule *pill;
1518         struct brw_page *pg_prev;
1519         void *short_io_buf;
1520         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1521         struct inode *inode = NULL;
1522         bool directio = false;
1523         bool gpu = 0;
1524         bool enable_checksum = true;
1525         struct cl_page *clpage;
1526
1527         ENTRY;
1528         if (pga[0]->bp_page) {
1529                 clpage = oap2cl_page(brw_page2oap(pga[0]));
1530                 inode = clpage->cp_inode;
1531                 if (clpage->cp_type == CPT_TRANSIENT)
1532                         directio = true;
1533         }
1534         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1535                 RETURN(-ENOMEM); /* Recoverable */
1536         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1537                 RETURN(-EINVAL); /* Fatal */
1538
1539         if ((cmd & OBD_BRW_WRITE) != 0) {
1540                 opc = OST_WRITE;
1541                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1542                                                 osc_rq_pool,
1543                                                 &RQF_OST_BRW_WRITE);
1544         } else {
1545                 opc = OST_READ;
1546                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1547         }
1548         if (req == NULL)
1549                 RETURN(-ENOMEM);
1550
1551         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1552             llcrypt_has_encryption_key(inode)) {
1553                 struct page **pa = NULL;
1554
1555 #ifdef CONFIG_LL_ENCRYPTION
1556                 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1557                 if (pa == NULL) {
1558                         ptlrpc_request_free(req);
1559                         RETURN(-ENOMEM);
1560                 }
1561
1562                 rc = obd_pool_get_pages_array(pa, page_count);
1563                 if (rc) {
1564                         CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
1565                                rc);
1566                         ptlrpc_request_free(req);
1567                         RETURN(rc);
1568                 }
1569 #endif
1570
1571                 for (i = 0; i < page_count; i++) {
1572                         struct brw_page *brwpg = pga[i];
1573                         struct page *data_page = NULL;
1574                         bool retried = false;
1575                         bool lockedbymyself;
1576                         u32 nunits =
1577                                 (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count;
1578                         struct address_space *map_orig = NULL;
1579                         pgoff_t index_orig;
1580
1581 retry_encrypt:
1582                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1583                         /* The page can already be locked when we arrive here.
1584                          * This is possible when cl_page_assume/vvp_page_assume
1585                          * is stuck on wait_on_page_writeback with page lock
1586                          * held. In this case there is no risk for the lock to
1587                          * be released while we are doing our encryption
1588                          * processing, because writeback against that page will
1589                          * end in vvp_page_completion_write/cl_page_completion,
1590                          * which means only once the page is fully processed.
1591                          */
1592                         lockedbymyself = trylock_page(brwpg->bp_page);
1593                         if (directio) {
1594                                 map_orig = brwpg->bp_page->mapping;
1595                                 brwpg->bp_page->mapping = inode->i_mapping;
1596                                 index_orig = brwpg->bp_page->index;
1597                                 clpage = oap2cl_page(brw_page2oap(brwpg));
1598                                 brwpg->bp_page->index = clpage->cp_page_index;
1599                         }
1600                         data_page =
1601                                 osc_encrypt_pagecache_blocks(brwpg->bp_page,
1602                                                             pa ? pa[i] : NULL,
1603                                                             nunits, 0,
1604                                                             GFP_NOFS);
1605                         if (directio) {
1606                                 brwpg->bp_page->mapping = map_orig;
1607                                 brwpg->bp_page->index = index_orig;
1608                         }
1609                         if (lockedbymyself)
1610                                 unlock_page(brwpg->bp_page);
1611                         if (IS_ERR(data_page)) {
1612                                 rc = PTR_ERR(data_page);
1613                                 if (rc == -ENOMEM && !retried) {
1614                                         retried = true;
1615                                         rc = 0;
1616                                         goto retry_encrypt;
1617                                 }
1618                                 if (pa) {
1619                                         obd_pool_put_pages_array(pa + i,
1620                                                                  page_count - i);
1621                                         OBD_FREE_PTR_ARRAY_LARGE(pa,
1622                                                                  page_count);
1623                                 }
1624                                 ptlrpc_request_free(req);
1625                                 RETURN(rc);
1626                         }
1627                         /* Set PageChecked flag on bounce page for
1628                          * disambiguation in osc_release_bounce_pages().
1629                          */
1630                         SetPageChecked(data_page);
1631                         brwpg->bp_page = data_page;
1632                         /* there should be no gap in the middle of page array */
1633                         if (i == page_count - 1) {
1634                                 struct osc_async_page *oap =
1635                                         brw_page2oap(brwpg);
1636
1637                                 oa->o_size = oap->oap_count +
1638                                         oap->oap_obj_off + oap->oap_page_off;
1639                         }
1640                         /* len is forced to nunits, and relative offset to 0
1641                          * so store the old, clear text info
1642                          */
1643                         brwpg->bp_count_diff = nunits - brwpg->bp_count;
1644                         brwpg->bp_count = nunits;
1645                         brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK;
1646                         brwpg->bp_off = brwpg->bp_off & PAGE_MASK;
1647                 }
1648
1649                 if (pa)
1650                         OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1651         } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1652                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1653                 struct cl_page *clpage = oap2cl_page(oap);
1654                 struct cl_object *clobj = clpage->cp_obj;
1655                 struct cl_attr attr = { 0 };
1656                 struct lu_env *env;
1657                 __u16 refcheck;
1658
1659                 env = cl_env_get(&refcheck);
1660                 if (IS_ERR(env)) {
1661                         rc = PTR_ERR(env);
1662                         ptlrpc_request_free(req);
1663                         RETURN(rc);
1664                 }
1665
1666                 cl_object_attr_lock(clobj);
1667                 rc = cl_object_attr_get(env, clobj, &attr);
1668                 cl_object_attr_unlock(clobj);
1669                 cl_env_put(env, &refcheck);
1670                 if (rc != 0) {
1671                         ptlrpc_request_free(req);
1672                         RETURN(rc);
1673                 }
1674                 if (attr.cat_size)
1675                         oa->o_size = attr.cat_size;
1676         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1677                    llcrypt_has_encryption_key(inode)) {
1678                 for (i = 0; i < page_count; i++) {
1679                         struct brw_page *pg = pga[i];
1680                         u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count;
1681
1682                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1683                         /* count/off are forced to cover the whole encryption
1684                          * unit size so that all encrypted data is stored on the
1685                          * OST, so adjust bp_{count,off}_diff for the size of
1686                          * the clear text.
1687                          */
1688                         pg->bp_count_diff = nunits - pg->bp_count;
1689                         pg->bp_count = nunits;
1690                         pg->bp_off_diff = pg->bp_off & ~PAGE_MASK;
1691                         pg->bp_off = pg->bp_off & PAGE_MASK;
1692                 }
1693         }
1694
1695         for (niocount = i = 1; i < page_count; i++) {
1696                 if (!can_merge_pages(pga[i - 1], pga[i]))
1697                         niocount++;
1698         }
1699
1700         pill = &req->rq_pill;
1701         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1702                              sizeof(*ioobj));
1703         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1704                              niocount * sizeof(*niobuf));
1705
1706         for (i = 0; i < page_count; i++) {
1707                 short_io_size += pga[i]->bp_count;
1708                 if (!inode || !IS_ENCRYPTED(inode) ||
1709                     !llcrypt_has_encryption_key(inode)) {
1710                         pga[i]->bp_count_diff = 0;
1711                         pga[i]->bp_off_diff = 0;
1712                 }
1713         }
1714
1715         if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1716                 enable_checksum = false;
1717                 short_io_size = 0;
1718                 gpu = 1;
1719         }
1720
1721         /* Check if read/write is small enough to be a short io. */
1722         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1723             !imp_connect_shortio(cli->cl_import))
1724                 short_io_size = 0;
1725
1726         /* If this is an empty RPC to old server, just ignore it */
1727         if (!short_io_size && !pga[0]->bp_page) {
1728                 ptlrpc_request_free(req);
1729                 RETURN(-ENODATA);
1730         }
1731
1732         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1733                              opc == OST_READ ? 0 : short_io_size);
1734         if (opc == OST_READ)
1735                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1736                                      short_io_size);
1737
1738         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1739         if (rc) {
1740                 ptlrpc_request_free(req);
1741                 RETURN(rc);
1742         }
1743         osc_set_io_portal(req);
1744
1745         ptlrpc_at_set_req_timeout(req);
1746         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1747          * retry logic */
1748         req->rq_no_retry_einprogress = 1;
1749
1750         if (short_io_size != 0) {
1751                 desc = NULL;
1752                 short_io_buf = NULL;
1753                 goto no_bulk;
1754         }
1755
1756         desc = ptlrpc_prep_bulk_imp(req, page_count,
1757                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1758                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1759                         PTLRPC_BULK_PUT_SINK),
1760                 OST_BULK_PORTAL,
1761                 &ptlrpc_bulk_kiov_pin_ops);
1762
1763         if (desc == NULL)
1764                 GOTO(out, rc = -ENOMEM);
1765         /* NB request now owns desc and will free it when it gets freed */
1766         desc->bd_is_rdma = gpu;
1767 no_bulk:
1768         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1769         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1770         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1771         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1772
1773         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1774
1775         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1776          * and from_kgid(), because they are asynchronous. Fortunately, variable
1777          * oa contains valid o_uid and o_gid in these two operations.
1778          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1779          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1780          * other process logic */
1781         body->oa.o_uid = oa->o_uid;
1782         body->oa.o_gid = oa->o_gid;
1783
1784         obdo_to_ioobj(oa, ioobj);
1785         ioobj->ioo_bufcnt = niocount;
1786         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1787          * that might be send for this request.  The actual number is decided
1788          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1789          * "max - 1" for old client compatibility sending "0", and also so the
1790          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1791         if (desc != NULL)
1792                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1793         else /* short io */
1794                 ioobj_max_brw_set(ioobj, 0);
1795
1796         if (inode && IS_ENCRYPTED(inode) &&
1797             llcrypt_has_encryption_key(inode) &&
1798             !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) {
1799                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1800                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1801                         body->oa.o_flags = 0;
1802                 }
1803                 body->oa.o_flags |= LUSTRE_ENCRYPT_FL;
1804         }
1805
1806         if (short_io_size != 0) {
1807                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1808                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1809                         body->oa.o_flags = 0;
1810                 }
1811                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1812                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1813                        short_io_size);
1814                 if (opc == OST_WRITE) {
1815                         short_io_buf = req_capsule_client_get(pill,
1816                                                               &RMF_SHORT_IO);
1817                         LASSERT(short_io_buf != NULL);
1818                 }
1819         }
1820
1821         LASSERT(page_count > 0);
1822         pg_prev = pga[0];
1823         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1824                 struct brw_page *pg = pga[i];
1825                 int poff = pg->bp_off & ~PAGE_MASK;
1826
1827                 LASSERT(pg->bp_count > 0);
1828                 /* make sure there is no gap in the middle of page array */
1829                 LASSERTF(page_count == 1 ||
1830                          (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) &&
1831                           ergo(i > 0 && i < page_count - 1,
1832                                poff == 0 && pg->bp_count == PAGE_SIZE)   &&
1833                           ergo(i == page_count - 1, poff == 0)),
1834                          "i: %d/%d pg: %px off: %llu, count: %u\n",
1835                          i, page_count, pg, pg->bp_off, pg->bp_count);
1836                 LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off,
1837                          "i %d p_c %u pg %px [pri %lu ind %lu] off %llu prev_pg %px [pri %lu ind %lu] off %llu\n",
1838                          i, page_count,
1839                          pg->bp_page, page_private(pg->bp_page),
1840                          pg->bp_page->index, pg->bp_off,
1841                          pg_prev->bp_page, page_private(pg_prev->bp_page),
1842                          pg_prev->bp_page->index, pg_prev->bp_off);
1843                 LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) ==
1844                         (pg->bp_flag & OBD_BRW_SRVLOCK));
1845                 if (short_io_size != 0 && opc == OST_WRITE) {
1846                         unsigned char *ptr = kmap_atomic(pg->bp_page);
1847
1848                         LASSERT(short_io_size >= requested_nob + pg->bp_count);
1849                         memcpy(short_io_buf + requested_nob,
1850                                ptr + poff,
1851                                pg->bp_count);
1852                         kunmap_atomic(ptr);
1853                 } else if (short_io_size == 0) {
1854                         desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page, poff,
1855                                                          pg->bp_count);
1856                 }
1857                 requested_nob += pg->bp_count;
1858
1859                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1860                         niobuf--;
1861                         niobuf->rnb_len += pg->bp_count;
1862                 } else {
1863                         niobuf->rnb_offset = pg->bp_off;
1864                         niobuf->rnb_len    = pg->bp_count;
1865                         niobuf->rnb_flags  = pg->bp_flag;
1866                 }
1867                 pg_prev = pg;
1868                 if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MARK_COMPRESSED))
1869                         niobuf->rnb_flags |= OBD_BRW_COMPRESSED;
1870         }
1871
1872         LASSERTF((void *)(niobuf - niocount) ==
1873                  req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1874                  "want %px - real %px\n",
1875                  req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1876                  (void *)(niobuf - niocount));
1877
1878         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1879         if (resend) {
1880                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1881                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1882                         body->oa.o_flags = 0;
1883                 }
1884                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1885         }
1886
1887         if (osc_should_shrink_grant(cli))
1888                 osc_shrink_grant_local(cli, &body->oa);
1889
1890         if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1891                 enable_checksum = false;
1892
1893         /* size[REQ_REC_OFF] still sizeof (*body) */
1894         if (opc == OST_WRITE) {
1895                 if (enable_checksum) {
1896                         /* store cl_cksum_type in a local variable since
1897                          * it can be changed via lprocfs */
1898                         enum cksum_types cksum_type = cli->cl_cksum_type;
1899
1900                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1901                                 body->oa.o_flags = 0;
1902
1903                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1904                                                                 cksum_type);
1905                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1906
1907                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1908                                                   requested_nob, page_count,
1909                                                   pga, OST_WRITE,
1910                                                   &body->oa.o_cksum, resend);
1911                         if (rc < 0) {
1912                                 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1913                                        rc);
1914                                 GOTO(out, rc);
1915                         }
1916                         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1917                                "checksum at write origin: %x (%x)\n",
1918                                body->oa.o_cksum, cksum_type);
1919
1920                         /* save this in 'oa', too, for later checking */
1921                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1922                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1923                                                            cksum_type);
1924                 } else {
1925                         /* clear out the checksum flag, in case this is a
1926                          * resend but cl_checksum is no longer set. b=11238 */
1927                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1928                 }
1929                 oa->o_cksum = body->oa.o_cksum;
1930                 /* 1 RC per niobuf */
1931                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1932                                      sizeof(__u32) * niocount);
1933         } else {
1934                 if (enable_checksum) {
1935                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1936                                 body->oa.o_flags = 0;
1937                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1938                                 cli->cl_cksum_type);
1939                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1940                 }
1941
1942                 /* Client cksum has been already copied to wire obdo in previous
1943                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1944                  * resent due to cksum error, this will allow Server to
1945                  * check+dump pages on its side */
1946         }
1947         ptlrpc_request_set_replen(req);
1948
1949         aa = ptlrpc_req_async_args(aa, req);
1950         aa->aa_oa = oa;
1951         aa->aa_requested_nob = requested_nob;
1952         aa->aa_nio_count = niocount;
1953         aa->aa_page_count = page_count;
1954         aa->aa_resends = 0;
1955         aa->aa_ppga = pga;
1956         aa->aa_cli = cli;
1957         INIT_LIST_HEAD(&aa->aa_oaps);
1958
1959         *reqp = req;
1960         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1961         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1962                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1963                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1964         RETURN(0);
1965
1966  out:
1967         ptlrpc_req_finished(req);
1968         RETURN(rc);
1969 }
1970
1971 char dbgcksum_file_name[PATH_MAX];
1972
1973 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1974                                 struct brw_page **pga, __u32 server_cksum,
1975                                 __u32 client_cksum)
1976 {
1977         struct file *filp;
1978         int rc, i;
1979         unsigned int len;
1980         char *buf;
1981
1982         /* will only keep dump of pages on first error for the same range in
1983          * file/fid, not during the resends/retries. */
1984         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1985                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1986                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1987                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1988                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1989                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1990                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1991                  pga[0]->bp_off,
1992                  pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1,
1993                  client_cksum, server_cksum);
1994         CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1995         filp = filp_open(dbgcksum_file_name,
1996                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1997         if (IS_ERR(filp)) {
1998                 rc = PTR_ERR(filp);
1999                 if (rc == -EEXIST)
2000                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
2001                                "checksum error: rc = %d\n", dbgcksum_file_name,
2002                                rc);
2003                 else
2004                         CERROR("%s: can't open to dump pages with checksum "
2005                                "error: rc = %d\n", dbgcksum_file_name, rc);
2006                 return;
2007         }
2008
2009         for (i = 0; i < page_count; i++) {
2010                 len = pga[i]->bp_count;
2011                 buf = kmap(pga[i]->bp_page);
2012                 while (len != 0) {
2013                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
2014                         if (rc < 0) {
2015                                 CERROR("%s: wanted to write %u but got %d "
2016                                        "error\n", dbgcksum_file_name, len, rc);
2017                                 break;
2018                         }
2019                         len -= rc;
2020                         buf += rc;
2021                 }
2022                 kunmap(pga[i]->bp_page);
2023         }
2024
2025         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
2026         if (rc)
2027                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
2028         filp_close(filp, NULL);
2029
2030         libcfs_debug_dumplog();
2031 }
2032
2033 static int
2034 check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
2035                      __u32 client_cksum, __u32 server_cksum,
2036                      struct osc_brw_async_args *aa)
2037 {
2038         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
2039         enum cksum_types cksum_type;
2040         obd_dif_csum_fn *fn = NULL;
2041         int sector_size = 0;
2042         __u32 new_cksum;
2043         char *msg;
2044         int rc;
2045
2046         if (server_cksum == client_cksum) {
2047                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2048                 return 0;
2049         }
2050
2051         if (aa->aa_cli->cl_checksum_dump)
2052                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
2053                                     server_cksum, client_cksum);
2054
2055         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
2056                                            oa->o_flags : 0);
2057
2058         switch (cksum_type) {
2059         case OBD_CKSUM_T10IP512:
2060                 fn = obd_dif_ip_fn;
2061                 sector_size = 512;
2062                 break;
2063         case OBD_CKSUM_T10IP4K:
2064                 fn = obd_dif_ip_fn;
2065                 sector_size = 4096;
2066                 break;
2067         case OBD_CKSUM_T10CRC512:
2068                 fn = obd_dif_crc_fn;
2069                 sector_size = 512;
2070                 break;
2071         case OBD_CKSUM_T10CRC4K:
2072                 fn = obd_dif_crc_fn;
2073                 sector_size = 4096;
2074                 break;
2075         default:
2076                 break;
2077         }
2078
2079         if (fn)
2080                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
2081                                              aa->aa_page_count, aa->aa_ppga,
2082                                              OST_WRITE, fn, sector_size,
2083                                              &new_cksum, true);
2084         else
2085                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
2086                                        aa->aa_ppga, OST_WRITE, cksum_type,
2087                                        &new_cksum);
2088
2089         if (rc < 0)
2090                 msg = "failed to calculate the client write checksum";
2091         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
2092                 msg = "the server did not use the checksum type specified in "
2093                       "the original request - likely a protocol problem";
2094         else if (new_cksum == server_cksum)
2095                 msg = "changed on the client after we checksummed it - "
2096                       "likely false positive due to mmap IO (bug 11742)";
2097         else if (new_cksum == client_cksum)
2098                 msg = "changed in transit before arrival at OST";
2099         else
2100                 msg = "changed in transit AND doesn't match the original - "
2101                       "likely false positive due to mmap IO (bug 11742)";
2102
2103         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
2104                            DFID " object "DOSTID" extent [%llu-%llu], original "
2105                            "client csum %x (type %x), server csum %x (type %x),"
2106                            " client csum now %x\n",
2107                            obd_name, msg, libcfs_nidstr(&peer->nid),
2108                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
2109                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
2110                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
2111                            POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off,
2112                            aa->aa_ppga[aa->aa_page_count - 1]->bp_off +
2113                                 aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1,
2114                            client_cksum,
2115                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
2116                            server_cksum, cksum_type, new_cksum);
2117         return 1;
2118 }
2119
2120 /* Note rc enters this function as number of bytes transferred */
2121 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
2122 {
2123         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
2124         struct client_obd *cli = aa->aa_cli;
2125         const char *obd_name = cli->cl_import->imp_obd->obd_name;
2126         const struct lnet_processid *peer =
2127                 &req->rq_import->imp_connection->c_peer;
2128         struct ost_body *body;
2129         u32 client_cksum = 0;
2130         struct inode *inode = NULL;
2131         unsigned int blockbits = 0, blocksize = 0;
2132         struct cl_page *clpage;
2133
2134         ENTRY;
2135
2136         if (rc < 0 && rc != -EDQUOT) {
2137                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
2138                 RETURN(rc);
2139         }
2140
2141         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
2142         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
2143         if (body == NULL) {
2144                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
2145                 RETURN(-EPROTO);
2146         }
2147
2148         /* set/clear over quota flag for a uid/gid/projid */
2149         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
2150             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
2151                 unsigned qid[LL_MAXQUOTAS] = {
2152                                          body->oa.o_uid, body->oa.o_gid,
2153                                          body->oa.o_projid };
2154                 CDEBUG(D_QUOTA,
2155                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
2156                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
2157                        body->oa.o_valid, body->oa.o_flags);
2158                 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2159                                 body->oa.o_flags);
2160         }
2161
2162         osc_update_grant(cli, body);
2163
2164         if (rc < 0)
2165                 RETURN(rc);
2166
2167         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2168                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2169
2170         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2171                 if (rc > 0) {
2172                         CERROR("%s: unexpected positive size %d\n",
2173                                obd_name, rc);
2174                         RETURN(-EPROTO);
2175                 }
2176
2177                 if (req->rq_bulk != NULL &&
2178                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2179                         RETURN(-EAGAIN);
2180
2181                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2182                     check_write_checksum(&body->oa, peer, client_cksum,
2183                                          body->oa.o_cksum, aa))
2184                         RETURN(-EAGAIN);
2185
2186                 rc = check_write_rcs(req, aa->aa_requested_nob,
2187                                      aa->aa_nio_count, aa->aa_page_count,
2188                                      aa->aa_ppga);
2189                 GOTO(out, rc);
2190         }
2191
2192         /* The rest of this function executes only for OST_READs */
2193
2194         if (req->rq_bulk == NULL) {
2195                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2196                                           RCL_SERVER);
2197                 LASSERT(rc == req->rq_status);
2198         } else {
2199                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2200                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2201         }
2202         if (rc < 0)
2203                 GOTO(out, rc = -EAGAIN);
2204
2205         if (rc > aa->aa_requested_nob) {
2206                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2207                        rc, aa->aa_requested_nob);
2208                 RETURN(-EPROTO);
2209         }
2210
2211         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2212                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2213                        rc, req->rq_bulk->bd_nob_transferred);
2214                 RETURN(-EPROTO);
2215         }
2216
2217         if (req->rq_bulk == NULL) {
2218                 /* short io */
2219                 int nob, pg_count, i = 0;
2220                 unsigned char *buf;
2221
2222                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2223                 pg_count = aa->aa_page_count;
2224                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2225                                                    rc);
2226                 nob = rc;
2227                 while (nob > 0 && pg_count > 0) {
2228                         unsigned char *ptr;
2229                         int count = aa->aa_ppga[i]->bp_count > nob ?
2230                                     nob : aa->aa_ppga[i]->bp_count;
2231
2232                         CDEBUG(D_CACHE, "page %p count %d\n",
2233                                aa->aa_ppga[i]->bp_page, count);
2234                         ptr = kmap_atomic(aa->aa_ppga[i]->bp_page);
2235                         memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf,
2236                                count);
2237                         kunmap_atomic((void *) ptr);
2238
2239                         buf += count;
2240                         nob -= count;
2241                         i++;
2242                         pg_count--;
2243                 }
2244         }
2245
2246         if (rc < aa->aa_requested_nob)
2247                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2248
2249         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2250                 static int cksum_counter;
2251                 u32 server_cksum = body->oa.o_cksum;
2252                 int nob = rc;
2253                 char *via = "";
2254                 char *router = "";
2255                 enum cksum_types cksum_type;
2256                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2257                         body->oa.o_flags : 0;
2258
2259                 cksum_type = obd_cksum_type_unpack(o_flags);
2260                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2261                                           aa->aa_page_count, aa->aa_ppga,
2262                                           OST_READ, &client_cksum, false);
2263                 if (rc < 0)
2264                         GOTO(out, rc);
2265
2266                 if (req->rq_bulk != NULL &&
2267                     !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
2268                         via = " via ";
2269                         router = libcfs_nidstr(&req->rq_bulk->bd_sender);
2270                 }
2271
2272                 if (server_cksum != client_cksum) {
2273                         struct ost_body *clbody;
2274                         __u32 client_cksum2;
2275                         u32 page_count = aa->aa_page_count;
2276
2277                         osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2278                                              page_count, aa->aa_ppga,
2279                                              OST_READ, &client_cksum2, true);
2280                         clbody = req_capsule_client_get(&req->rq_pill,
2281                                                         &RMF_OST_BODY);
2282                         if (cli->cl_checksum_dump)
2283                                 dump_all_bulk_pages(&clbody->oa, page_count,
2284                                                     aa->aa_ppga, server_cksum,
2285                                                     client_cksum);
2286
2287                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2288                                            "%s%s%s inode "DFID" object "DOSTID
2289                                            " extent [%llu-%llu], client %x/%x, "
2290                                            "server %x, cksum_type %x\n",
2291                                            obd_name,
2292                                            libcfs_nidstr(&peer->nid),
2293                                            via, router,
2294                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2295                                                 clbody->oa.o_parent_seq : 0ULL,
2296                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2297                                                 clbody->oa.o_parent_oid : 0,
2298                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2299                                                 clbody->oa.o_parent_ver : 0,
2300                                            POSTID(&body->oa.o_oi),
2301                                            aa->aa_ppga[0]->bp_off,
2302                                            aa->aa_ppga[page_count-1]->bp_off +
2303                                            aa->aa_ppga[page_count-1]->bp_count - 1,
2304                                            client_cksum, client_cksum2,
2305                                            server_cksum, cksum_type);
2306                         cksum_counter = 0;
2307                         aa->aa_oa->o_cksum = client_cksum;
2308                         rc = -EAGAIN;
2309                 } else {
2310                         cksum_counter++;
2311                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2312                         rc = 0;
2313                 }
2314         } else if (unlikely(client_cksum)) {
2315                 static int cksum_missed;
2316
2317                 cksum_missed++;
2318                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2319                         CERROR("%s: checksum %u requested from %s but not sent\n",
2320                                obd_name, cksum_missed,
2321                                libcfs_nidstr(&peer->nid));
2322         } else {
2323                 rc = 0;
2324         }
2325
2326         /* get the inode from the first cl_page */
2327         clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
2328         inode = clpage->cp_inode;
2329         if (clpage->cp_type == CPT_TRANSIENT && inode) {
2330                 blockbits = inode->i_blkbits;
2331                 blocksize = 1 << blockbits;
2332         }
2333         if (inode && IS_ENCRYPTED(inode)) {
2334                 int idx;
2335
2336                 if (!llcrypt_has_encryption_key(inode)) {
2337                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2338                         GOTO(out, rc);
2339                 }
2340                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2341                         struct brw_page *brwpg = aa->aa_ppga[idx];
2342                         unsigned int offs = 0;
2343
2344                         while (offs < PAGE_SIZE) {
2345                                 /* do not decrypt if page is all 0s */
2346                                 if (memchr_inv(page_address(brwpg->bp_page) + offs,
2347                                       0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2348                                         /* if page is empty forward info to
2349                                          * upper layers (ll_io_zero_page) by
2350                                          * clearing PagePrivate2
2351                                          */
2352                                         if (!offs)
2353                                                 ClearPagePrivate2(brwpg->bp_page);
2354                                         break;
2355                                 }
2356
2357                                 if (blockbits) {
2358                                         /* This is direct IO case. Directly call
2359                                          * decrypt function that takes inode as
2360                                          * input parameter. Page does not need
2361                                          * to be locked.
2362                                          */
2363                                         u64 lblk_num;
2364                                         unsigned int i;
2365
2366                                         clpage =
2367                                                oap2cl_page(brw_page2oap(brwpg));
2368                                         lblk_num =
2369                                                 ((u64)(clpage->cp_page_index) <<
2370                                                 (PAGE_SHIFT - blockbits)) +
2371                                                 (offs >> blockbits);
2372                                         for (i = offs;
2373                                              i < offs +
2374                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2375                                              i += blocksize, lblk_num++) {
2376                                                 rc =
2377                                                   llcrypt_decrypt_block_inplace(
2378                                                           inode, brwpg->bp_page,
2379                                                           blocksize, i,
2380                                                           lblk_num);
2381                                                 if (rc)
2382                                                         break;
2383                                         }
2384                                 } else {
2385                                         rc = llcrypt_decrypt_pagecache_blocks(
2386                                                 brwpg->bp_page,
2387                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2388                                                 offs);
2389                                 }
2390                                 if (rc)
2391                                         GOTO(out, rc);
2392
2393                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2394                         }
2395                 }
2396         }
2397
2398 out:
2399         if (rc >= 0)
2400                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2401                                      aa->aa_oa, &body->oa);
2402
2403         RETURN(rc);
2404 }
2405
2406 static int osc_brw_redo_request(struct ptlrpc_request *request,
2407                                 struct osc_brw_async_args *aa, int rc)
2408 {
2409         struct ptlrpc_request *new_req;
2410         struct osc_brw_async_args *new_aa;
2411         ENTRY;
2412
2413         /* The below message is checked in replay-ost-single.sh test_8ae*/
2414         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2415                   "redo for recoverable error %d", rc);
2416
2417         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2418                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2419                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2420                                   aa->aa_ppga, &new_req, 1);
2421         if (rc)
2422                 RETURN(rc);
2423
2424
2425         LASSERTF(request == aa->aa_request,
2426                  "request %p != aa_request %p\n",
2427                  request, aa->aa_request);
2428         /*
2429          * New request takes over pga and oaps from old request.
2430          * Note that copying a list_head doesn't work, need to move it...
2431          */
2432         aa->aa_resends++;
2433         new_req->rq_interpret_reply = request->rq_interpret_reply;
2434         new_req->rq_async_args = request->rq_async_args;
2435         new_req->rq_commit_cb = request->rq_commit_cb;
2436         /* cap resend delay to the current request timeout, this is similar to
2437          * what ptlrpc does (see after_reply()) */
2438         if (aa->aa_resends > new_req->rq_timeout)
2439                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2440         else
2441                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2442         new_req->rq_generation_set = 1;
2443         new_req->rq_import_generation = request->rq_import_generation;
2444
2445         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2446
2447         INIT_LIST_HEAD(&new_aa->aa_oaps);
2448         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2449         INIT_LIST_HEAD(&new_aa->aa_exts);
2450         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2451         new_aa->aa_resends = aa->aa_resends;
2452
2453         if (aa->aa_request) {
2454                 ptlrpc_req_finished(aa->aa_request);
2455                 new_aa->aa_request = ptlrpc_request_addref(new_req);
2456         }
2457
2458         /* XXX: This code will run into problem if we're going to support
2459          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2460          * and wait for all of them to be finished. We should inherit request
2461          * set from old request. */
2462         ptlrpcd_add_req(new_req);
2463
2464         DEBUG_REQ(D_INFO, new_req, "new request");
2465         RETURN(0);
2466 }
2467
2468 /*
2469  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2470  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2471  * fine for our small page arrays and doesn't require allocation.  its an
2472  * insertion sort that swaps elements that are strides apart, shrinking the
2473  * stride down until its '1' and the array is sorted.
2474  */
2475 static void sort_brw_pages(struct brw_page **array, int num)
2476 {
2477         int stride, i, j;
2478         struct brw_page *tmp;
2479
2480         if (num == 1)
2481                 return;
2482         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2483                 ;
2484
2485         do {
2486                 stride /= 3;
2487                 for (i = stride ; i < num ; i++) {
2488                         tmp = array[i];
2489                         j = i;
2490                         while (j >= stride && array[j - stride]->bp_off > tmp->bp_off) {
2491                                 array[j] = array[j - stride];
2492                                 j -= stride;
2493                         }
2494                         array[j] = tmp;
2495                 }
2496         } while (stride > 1);
2497 }
2498
2499 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2500 {
2501         LASSERT(ppga != NULL);
2502         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2503 }
2504
2505 /* this is trying to propogate async writeback errors back up to the
2506  * application.  As an async write fails we record the error code for later if
2507  * the app does an fsync.  As long as errors persist we force future rpcs to be
2508  * sync so that the app can get a sync error and break the cycle of queueing
2509  * pages for which writeback will fail.
2510  */
2511 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2512                            int rc)
2513 {
2514         if (rc) {
2515                 if (!ar->ar_rc)
2516                         ar->ar_rc = rc;
2517
2518                 ar->ar_force_sync = 1;
2519                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2520                 return;
2521
2522         }
2523
2524         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2525                 ar->ar_force_sync = 0;
2526 }
2527
2528 static int brw_interpret(const struct lu_env *env,
2529                          struct ptlrpc_request *req, void *args, int rc)
2530 {
2531         struct osc_brw_async_args *aa = args;
2532         struct client_obd *cli = aa->aa_cli;
2533         unsigned long transferred = 0;
2534         struct cl_object *obj = NULL;
2535         struct osc_async_page *last;
2536         struct osc_extent *ext;
2537         struct osc_extent *tmp;
2538         struct lov_oinfo *loi;
2539
2540         ENTRY;
2541
2542         ext = list_first_entry(&aa->aa_exts, struct osc_extent, oe_link);
2543
2544         rc = osc_brw_fini_request(req, rc);
2545         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2546
2547         /* restore clear text pages */
2548         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2549
2550         /*
2551          * When server returns -EINPROGRESS, client should always retry
2552          * regardless of the number of times the bulk was resent already.
2553          */
2554         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2555                 if (req->rq_import_generation !=
2556                     req->rq_import->imp_generation) {
2557                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2558                                ""DOSTID", rc = %d.\n",
2559                                req->rq_import->imp_obd->obd_name,
2560                                POSTID(&aa->aa_oa->o_oi), rc);
2561                 } else if (rc == -EINPROGRESS ||
2562                            client_should_resend(aa->aa_resends, aa->aa_cli)) {
2563                         rc = osc_brw_redo_request(req, aa, rc);
2564                 } else {
2565                         CERROR("%s: too many resent retries for object: "
2566                                "%llu:%llu, rc = %d.\n",
2567                                req->rq_import->imp_obd->obd_name,
2568                                POSTID(&aa->aa_oa->o_oi), rc);
2569                 }
2570
2571                 if (rc == 0)
2572                         RETURN(0);
2573                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2574                         rc = -EIO;
2575         }
2576
2577         last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2578         obj = osc2cl(ext->oe_obj);
2579         loi = cl2osc(obj)->oo_oinfo;
2580
2581         if (rc == 0) {
2582                 struct obdo *oa = aa->aa_oa;
2583                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2584                 unsigned long valid = 0;
2585
2586                 cl_object_attr_lock(obj);
2587                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2588                         attr->cat_blocks = oa->o_blocks;
2589                         valid |= CAT_BLOCKS;
2590                 }
2591                 if (oa->o_valid & OBD_MD_FLMTIME) {
2592                         attr->cat_mtime = oa->o_mtime;
2593                         valid |= CAT_MTIME;
2594                 }
2595                 if (oa->o_valid & OBD_MD_FLATIME) {
2596                         attr->cat_atime = oa->o_atime;
2597                         valid |= CAT_ATIME;
2598                 }
2599                 if (oa->o_valid & OBD_MD_FLCTIME) {
2600                         attr->cat_ctime = oa->o_ctime;
2601                         valid |= CAT_CTIME;
2602                 }
2603
2604                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2605                         loff_t last_off = last->oap_count + last->oap_obj_off +
2606                                 last->oap_page_off;
2607
2608                         /* Change file size if this is an out of quota or
2609                          * direct IO write and it extends the file size */
2610                         if (loi->loi_lvb.lvb_size < last_off) {
2611                                 attr->cat_size = last_off;
2612                                 valid |= CAT_SIZE;
2613                         }
2614                         /* Extend KMS if it's not a lockless write */
2615                         if (loi->loi_kms < last_off &&
2616                             oap2osc_page(last)->ops_srvlock == 0) {
2617                                 attr->cat_kms = last_off;
2618                                 valid |= CAT_KMS;
2619                         }
2620                 }
2621
2622                 if (valid != 0)
2623                         cl_object_attr_update(env, obj, attr, valid);
2624                 cl_object_attr_unlock(obj);
2625         }
2626         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2627         aa->aa_oa = NULL;
2628
2629         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) {
2630                 osc_inc_unstable_pages(req);
2631                 /*
2632                  * If req->rq_committed is set, it means that the dirty pages
2633                  * have already committed into the stable storage on OSTs
2634                  * (i.e. Direct I/O).
2635                  */
2636                 if (!req->rq_committed)
2637                         cl_object_dirty_for_sync(env, cl_object_top(obj));
2638         }
2639
2640         if (aa->aa_request) {
2641                 __u64 xid = ptlrpc_req_xid(req);
2642
2643                 ptlrpc_req_finished(req);
2644                 if (xid && lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2645                         spin_lock(&cli->cl_loi_list_lock);
2646                         osc_process_ar(&cli->cl_ar, xid, rc);
2647                         osc_process_ar(&loi->loi_ar, xid, rc);
2648                         spin_unlock(&cli->cl_loi_list_lock);
2649                 }
2650         }
2651         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2652                 list_del_init(&ext->oe_link);
2653                 osc_extent_finish(env, ext, 1,
2654                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2655         }
2656         LASSERT(list_empty(&aa->aa_exts));
2657         LASSERT(list_empty(&aa->aa_oaps));
2658
2659         transferred = (req->rq_bulk == NULL ? /* short io */
2660                        aa->aa_requested_nob :
2661                        req->rq_bulk->bd_nob_transferred);
2662
2663         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2664         ptlrpc_lprocfs_brw(req, transferred);
2665
2666         spin_lock(&cli->cl_loi_list_lock);
2667         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2668          * is called so we know whether to go to sync BRWs or wait for more
2669          * RPCs to complete */
2670         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2671                 cli->cl_w_in_flight--;
2672         else
2673                 cli->cl_r_in_flight--;
2674         osc_wake_cache_waiters(cli);
2675         spin_unlock(&cli->cl_loi_list_lock);
2676
2677         osc_io_unplug(env, cli, NULL);
2678         RETURN(rc);
2679 }
2680
2681 static void brw_commit(struct ptlrpc_request *req)
2682 {
2683         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2684          * this called via the rq_commit_cb, I need to ensure
2685          * osc_dec_unstable_pages is still called. Otherwise unstable
2686          * pages may be leaked. */
2687         spin_lock(&req->rq_lock);
2688         if (likely(req->rq_unstable)) {
2689                 req->rq_unstable = 0;
2690                 spin_unlock(&req->rq_lock);
2691
2692                 osc_dec_unstable_pages(req);
2693         } else {
2694                 req->rq_committed = 1;
2695                 spin_unlock(&req->rq_lock);
2696         }
2697 }
2698
2699 /**
2700  * Build an RPC by the list of extent @ext_list. The caller must ensure
2701  * that the total pages in this list are NOT over max pages per RPC.
2702  * Extents in the list must be in OES_RPC state.
2703  */
2704 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2705                   struct list_head *ext_list, int cmd)
2706 {
2707         struct ptlrpc_request           *req = NULL;
2708         struct osc_extent               *ext;
2709         struct brw_page                 **pga = NULL;
2710         struct osc_brw_async_args       *aa = NULL;
2711         struct obdo                     *oa = NULL;
2712         struct osc_async_page           *oap;
2713         struct osc_object               *obj = NULL;
2714         struct cl_req_attr              *crattr = NULL;
2715         loff_t                          starting_offset = OBD_OBJECT_EOF;
2716         loff_t                          ending_offset = 0;
2717         /* '1' for consistency with code that checks !mpflag to restore */
2718         int mpflag = 1;
2719         int                             mem_tight = 0;
2720         int                             page_count = 0;
2721         bool                            soft_sync = false;
2722         bool                            ndelay = false;
2723         int                             i;
2724         int                             grant = 0;
2725         int                             rc;
2726         __u32                           layout_version = 0;
2727         LIST_HEAD(rpc_list);
2728         struct ost_body                 *body;
2729         ENTRY;
2730         LASSERT(!list_empty(ext_list));
2731
2732         /* add pages into rpc_list to build BRW rpc */
2733         list_for_each_entry(ext, ext_list, oe_link) {
2734                 LASSERT(ext->oe_state == OES_RPC);
2735                 mem_tight |= ext->oe_memalloc;
2736                 grant += ext->oe_grants;
2737                 page_count += ext->oe_nr_pages;
2738                 layout_version = max(layout_version, ext->oe_layout_version);
2739                 if (obj == NULL)
2740                         obj = ext->oe_obj;
2741         }
2742
2743         soft_sync = osc_over_unstable_soft_limit(cli);
2744         if (mem_tight)
2745                 mpflag = memalloc_noreclaim_save();
2746
2747         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2748         if (pga == NULL)
2749                 GOTO(out, rc = -ENOMEM);
2750
2751         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2752         if (oa == NULL)
2753                 GOTO(out, rc = -ENOMEM);
2754
2755         i = 0;
2756         list_for_each_entry(ext, ext_list, oe_link) {
2757                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2758                         if (mem_tight)
2759                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2760                         if (soft_sync)
2761                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2762                         pga[i] = &oap->oap_brw_page;
2763                         pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off;
2764                         i++;
2765
2766                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2767                         if (starting_offset == OBD_OBJECT_EOF ||
2768                             starting_offset > oap->oap_obj_off) {
2769                                 starting_offset = oap->oap_obj_off;
2770                         } else {
2771                                 CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n",
2772                                        i, oap->oap_obj_off, oap->oap_page_off);
2773                                 LASSERT(oap->oap_page_off == 0);
2774                         }
2775                         if (ending_offset < oap->oap_obj_off + oap->oap_count) {
2776                                 ending_offset = oap->oap_obj_off +
2777                                                 oap->oap_count;
2778                         } else {
2779                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2780                                         PAGE_SIZE);
2781                         }
2782                 }
2783                 if (ext->oe_ndelay)
2784                         ndelay = true;
2785         }
2786
2787         /* first page in the list */
2788         oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2789
2790         crattr = &osc_env_info(env)->oti_req_attr;
2791         memset(crattr, 0, sizeof(*crattr));
2792         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2793         crattr->cra_flags = ~0ULL;
2794         crattr->cra_page = oap2cl_page(oap);
2795         crattr->cra_oa = oa;
2796         cl_req_attr_set(env, osc2cl(obj), crattr);
2797
2798         if (cmd == OBD_BRW_WRITE) {
2799                 oa->o_grant_used = grant;
2800                 if (layout_version > 0) {
2801                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2802                                PFID(&oa->o_oi.oi_fid), layout_version);
2803
2804                         oa->o_layout_version = layout_version;
2805                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2806                 }
2807         }
2808
2809         sort_brw_pages(pga, page_count);
2810         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2811         if (rc != 0) {
2812                 CERROR("prep_req failed: %d\n", rc);
2813                 GOTO(out, rc);
2814         }
2815
2816         req->rq_commit_cb = brw_commit;
2817         req->rq_interpret_reply = brw_interpret;
2818         req->rq_memalloc = mem_tight != 0;
2819         if (ndelay) {
2820                 req->rq_no_resend = req->rq_no_delay = 1;
2821                 /* probably set a shorter timeout value.
2822                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2823                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2824         }
2825
2826         /* Need to update the timestamps after the request is built in case
2827          * we race with setattr (locally or in queue at OST).  If OST gets
2828          * later setattr before earlier BRW (as determined by the request xid),
2829          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2830          * way to do this in a single call.  bug 10150 */
2831         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2832         crattr->cra_oa = &body->oa;
2833         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2834         cl_req_attr_set(env, osc2cl(obj), crattr);
2835         lustre_msg_set_uid_gid(req->rq_reqmsg, &crattr->cra_uid,
2836                                &crattr->cra_gid);
2837         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2838
2839         aa = ptlrpc_req_async_args(aa, req);
2840         INIT_LIST_HEAD(&aa->aa_oaps);
2841         list_splice_init(&rpc_list, &aa->aa_oaps);
2842         INIT_LIST_HEAD(&aa->aa_exts);
2843         list_splice_init(ext_list, &aa->aa_exts);
2844         aa->aa_request = ptlrpc_request_addref(req);
2845
2846         spin_lock(&cli->cl_loi_list_lock);
2847         starting_offset >>= PAGE_SHIFT;
2848         ending_offset >>= PAGE_SHIFT;
2849         if (cmd == OBD_BRW_READ) {
2850                 cli->cl_r_in_flight++;
2851                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2852                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2853                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2854                                       starting_offset + 1);
2855         } else {
2856                 cli->cl_w_in_flight++;
2857                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2858                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2859                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2860                                       starting_offset + 1);
2861         }
2862         spin_unlock(&cli->cl_loi_list_lock);
2863
2864         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2865                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2866         if (libcfs_debug & D_IOTRACE) {
2867                 struct lu_fid fid;
2868
2869                 fid.f_seq = crattr->cra_oa->o_parent_seq;
2870                 fid.f_oid = crattr->cra_oa->o_parent_oid;
2871                 fid.f_ver = crattr->cra_oa->o_parent_ver;
2872                 CDEBUG(D_IOTRACE,
2873                        DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
2874                        PFID(&fid), page_count,
2875                        cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
2876                        ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
2877         }
2878         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2879
2880         ptlrpcd_add_req(req);
2881         rc = 0;
2882         EXIT;
2883
2884 out:
2885         if (mem_tight)
2886                 memalloc_noreclaim_restore(mpflag);
2887
2888         if (rc != 0) {
2889                 LASSERT(req == NULL);
2890
2891                 if (oa)
2892                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2893                 if (pga) {
2894                         osc_release_bounce_pages(pga, page_count);
2895                         osc_release_ppga(pga, page_count);
2896                 }
2897                 /* this should happen rarely and is pretty bad, it makes the
2898                  * pending list not follow the dirty order
2899                  */
2900                 while ((ext = list_first_entry_or_null(ext_list,
2901                                                        struct osc_extent,
2902                                                        oe_link)) != NULL) {
2903                         list_del_init(&ext->oe_link);
2904                         osc_extent_finish(env, ext, 0, rc);
2905                 }
2906         }
2907         RETURN(rc);
2908 }
2909
2910 /* This is to refresh our lock in face of no RPCs. */
2911 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2912 {
2913         struct ptlrpc_request *req;
2914         struct obdo oa;
2915         struct brw_page bpg = { .bp_off = start, .bp_count = 1};
2916         struct brw_page *pga = &bpg;
2917         int rc;
2918
2919         memset(&oa, 0, sizeof(oa));
2920         oa.o_oi = osc->oo_oinfo->loi_oi;
2921         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2922         /* For updated servers - don't do a read */
2923         oa.o_flags = OBD_FL_NORPC;
2924
2925         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2926                                   &req, 0);
2927
2928         /* If we succeeded we ship it off, if not there's no point in doing
2929          * anything. Also no resends.
2930          * No interpret callback, no commit callback.
2931          */
2932         if (!rc) {
2933                 req->rq_no_resend = 1;
2934                 ptlrpcd_add_req(req);
2935         }
2936 }
2937
2938 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2939 {
2940         int set = 0;
2941
2942         LASSERT(lock != NULL);
2943
2944         lock_res_and_lock(lock);
2945
2946         if (lock->l_ast_data == NULL)
2947                 lock->l_ast_data = data;
2948         if (lock->l_ast_data == data)
2949                 set = 1;
2950
2951         unlock_res_and_lock(lock);
2952
2953         return set;
2954 }
2955
2956 static int osc_enqueue_fini(struct ptlrpc_request *req,
2957                             osc_enqueue_upcall_f upcall,
2958                             void *cookie, struct lustre_handle *lockh,
2959                             enum ldlm_mode mode, __u64 *flags,
2960                             bool speculative, int errcode)
2961 {
2962         bool intent = *flags & LDLM_FL_HAS_INTENT;
2963         int rc;
2964         ENTRY;
2965
2966         /* The request was created before ldlm_cli_enqueue call. */
2967         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2968                 struct ldlm_reply *rep;
2969
2970                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2971                 LASSERT(rep != NULL);
2972
2973                 rep->lock_policy_res1 =
2974                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2975                 if (rep->lock_policy_res1)
2976                         errcode = rep->lock_policy_res1;
2977                 if (!speculative)
2978                         *flags |= LDLM_FL_LVB_READY;
2979         } else if (errcode == ELDLM_OK) {
2980                 *flags |= LDLM_FL_LVB_READY;
2981         }
2982
2983         /* Call the update callback. */
2984         rc = (*upcall)(cookie, lockh, errcode);
2985
2986         /* release the reference taken in ldlm_cli_enqueue() */
2987         if (errcode == ELDLM_LOCK_MATCHED)
2988                 errcode = ELDLM_OK;
2989         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2990                 ldlm_lock_decref(lockh, mode);
2991
2992         RETURN(rc);
2993 }
2994
2995 static int osc_enqueue_interpret(const struct lu_env *env,
2996                                  struct ptlrpc_request *req,
2997                                  void *args, int rc)
2998 {
2999         struct osc_enqueue_args *aa = args;
3000         struct ldlm_lock *lock;
3001         struct lustre_handle *lockh = &aa->oa_lockh;
3002         enum ldlm_mode mode = aa->oa_mode;
3003         struct ost_lvb *lvb = aa->oa_lvb;
3004         __u32 lvb_len = sizeof(*lvb);
3005         __u64 flags = 0;
3006         struct ldlm_enqueue_info einfo = {
3007                 .ei_type = aa->oa_type,
3008                 .ei_mode = mode,
3009         };
3010
3011         ENTRY;
3012
3013         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3014          * be valid. */
3015         lock = ldlm_handle2lock(lockh);
3016         LASSERTF(lock != NULL,
3017                  "lockh %#llx, req %px, aa %px - client evicted?\n",
3018                  lockh->cookie, req, aa);
3019
3020         /* Take an additional reference so that a blocking AST that
3021          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3022          * to arrive after an upcall has been executed by
3023          * osc_enqueue_fini(). */
3024         ldlm_lock_addref(lockh, mode);
3025
3026         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
3027         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
3028
3029         /* Let CP AST to grant the lock first. */
3030         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3031
3032         if (aa->oa_speculative) {
3033                 LASSERT(aa->oa_lvb == NULL);
3034                 LASSERT(aa->oa_flags == NULL);
3035                 aa->oa_flags = &flags;
3036         }
3037
3038         /* Complete obtaining the lock procedure. */
3039         rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
3040                                    aa->oa_flags, lvb, lvb_len, lockh, rc,
3041                                    false);
3042         /* Complete osc stuff. */
3043         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
3044                               aa->oa_flags, aa->oa_speculative, rc);
3045
3046         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3047
3048         ldlm_lock_decref(lockh, mode);
3049         LDLM_LOCK_PUT(lock);
3050         RETURN(rc);
3051 }
3052
3053 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3054  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3055  * other synchronous requests, however keeping some locks and trying to obtain
3056  * others may take a considerable amount of time in a case of ost failure; and
3057  * when other sync requests do not get released lock from a client, the client
3058  * is evicted from the cluster -- such scenarious make the life difficult, so
3059  * release locks just after they are obtained. */
3060 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3061                      __u64 *flags, union ldlm_policy_data *policy,
3062                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
3063                      void *cookie, struct ldlm_enqueue_info *einfo,
3064                      struct ptlrpc_request_set *rqset, int async,
3065                      bool speculative)
3066 {
3067         struct obd_device *obd = exp->exp_obd;
3068         struct lustre_handle lockh = { 0 };
3069         struct ptlrpc_request *req = NULL;
3070         int intent = *flags & LDLM_FL_HAS_INTENT;
3071         __u64 search_flags = *flags;
3072         __u64 match_flags = 0;
3073         enum ldlm_mode mode;
3074         int rc;
3075         ENTRY;
3076
3077         /* Filesystem lock extents are extended to page boundaries so that
3078          * dealing with the page cache is a little smoother.  */
3079         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3080         policy->l_extent.end |= ~PAGE_MASK;
3081
3082         /* Next, search for already existing extent locks that will cover us */
3083         /* If we're trying to read, we also search for an existing PW lock.  The
3084          * VFS and page cache already protect us locally, so lots of readers/
3085          * writers can share a single PW lock.
3086          *
3087          * There are problems with conversion deadlocks, so instead of
3088          * converting a read lock to a write lock, we'll just enqueue a new
3089          * one.
3090          *
3091          * At some point we should cancel the read lock instead of making them
3092          * send us a blocking callback, but there are problems with canceling
3093          * locks out from other users right now, too. */
3094         mode = einfo->ei_mode;
3095         if (einfo->ei_mode == LCK_PR)
3096                 mode |= LCK_PW;
3097         /* Normal lock requests must wait for the LVB to be ready before
3098          * matching a lock; speculative lock requests do not need to,
3099          * because they will not actually use the lock. */
3100         if (!speculative)
3101                 search_flags |= LDLM_FL_LVB_READY;
3102         if (intent != 0)
3103                 search_flags |= LDLM_FL_BLOCK_GRANTED;
3104         if (mode == LCK_GROUP)
3105                 match_flags = LDLM_MATCH_GROUP;
3106         mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
3107                                          res_id, einfo->ei_type, policy, mode,
3108                                          &lockh, match_flags);
3109         if (mode) {
3110                 struct ldlm_lock *matched;
3111
3112                 if (*flags & LDLM_FL_TEST_LOCK)
3113                         RETURN(ELDLM_OK);
3114
3115                 matched = ldlm_handle2lock(&lockh);
3116                 if (speculative) {
3117                         /* This DLM lock request is speculative, and does not
3118                          * have an associated IO request. Therefore if there
3119                          * is already a DLM lock, it wll just inform the
3120                          * caller to cancel the request for this stripe.*/
3121                         lock_res_and_lock(matched);
3122                         if (ldlm_extent_equal(&policy->l_extent,
3123                             &matched->l_policy_data.l_extent))
3124                                 rc = -EEXIST;
3125                         else
3126                                 rc = -ECANCELED;
3127                         unlock_res_and_lock(matched);
3128
3129                         ldlm_lock_decref(&lockh, mode);
3130                         LDLM_LOCK_PUT(matched);
3131                         RETURN(rc);
3132                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
3133                         *flags |= LDLM_FL_LVB_READY;
3134
3135                         /* We already have a lock, and it's referenced. */
3136                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
3137
3138                         ldlm_lock_decref(&lockh, mode);
3139                         LDLM_LOCK_PUT(matched);
3140                         RETURN(ELDLM_OK);
3141                 } else {
3142                         ldlm_lock_decref(&lockh, mode);
3143                         LDLM_LOCK_PUT(matched);
3144                 }
3145         }
3146
3147         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
3148                 RETURN(-ENOLCK);
3149
3150         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3151         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3152
3153         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3154                               sizeof(*lvb), LVB_T_OST, &lockh, async);
3155         if (async) {
3156                 if (!rc) {
3157                         struct osc_enqueue_args *aa;
3158                         aa = ptlrpc_req_async_args(aa, req);
3159                         aa->oa_exp         = exp;
3160                         aa->oa_mode        = einfo->ei_mode;
3161                         aa->oa_type        = einfo->ei_type;
3162                         lustre_handle_copy(&aa->oa_lockh, &lockh);
3163                         aa->oa_upcall      = upcall;
3164                         aa->oa_cookie      = cookie;
3165                         aa->oa_speculative = speculative;
3166                         if (!speculative) {
3167                                 aa->oa_flags  = flags;
3168                                 aa->oa_lvb    = lvb;
3169                         } else {
3170                                 /* speculative locks are essentially to enqueue
3171                                  * a DLM lock  in advance, so we don't care
3172                                  * about the result of the enqueue. */
3173                                 aa->oa_lvb    = NULL;
3174                                 aa->oa_flags  = NULL;
3175                         }
3176
3177                         req->rq_interpret_reply = osc_enqueue_interpret;
3178                         ptlrpc_set_add_req(rqset, req);
3179                 }
3180                 RETURN(rc);
3181         }
3182
3183         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
3184                               flags, speculative, rc);
3185
3186         RETURN(rc);
3187 }
3188
3189 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
3190                    struct ldlm_res_id *res_id, enum ldlm_type type,
3191                    union ldlm_policy_data *policy, enum ldlm_mode mode,
3192                    __u64 *flags, struct osc_object *obj,
3193                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
3194 {
3195         struct obd_device *obd = exp->exp_obd;
3196         __u64 lflags = *flags;
3197         enum ldlm_mode rc;
3198         ENTRY;
3199
3200         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3201                 RETURN(-EIO);
3202
3203         /* Filesystem lock extents are extended to page boundaries so that
3204          * dealing with the page cache is a little smoother */
3205         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3206         policy->l_extent.end |= ~PAGE_MASK;
3207
3208         /* Next, search for already existing extent locks that will cover us */
3209         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
3210                                         res_id, type, policy, mode, lockh,
3211                                         match_flags);
3212         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
3213                 RETURN(rc);
3214
3215         if (obj != NULL) {
3216                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3217
3218                 LASSERT(lock != NULL);
3219                 if (osc_set_lock_data(lock, obj)) {
3220                         lock_res_and_lock(lock);
3221                         if (!ldlm_is_lvb_cached(lock)) {
3222                                 LASSERT(lock->l_ast_data == obj);
3223                                 osc_lock_lvb_update(env, obj, lock, NULL);
3224                                 ldlm_set_lvb_cached(lock);
3225                         }
3226                         unlock_res_and_lock(lock);
3227                 } else {
3228                         ldlm_lock_decref(lockh, rc);
3229                         rc = 0;
3230                 }
3231                 LDLM_LOCK_PUT(lock);
3232         }
3233         RETURN(rc);
3234 }
3235
3236 static int osc_statfs_interpret(const struct lu_env *env,
3237                                 struct ptlrpc_request *req, void *args, int rc)
3238 {
3239         struct osc_async_args *aa = args;
3240         struct obd_statfs *msfs;
3241
3242         ENTRY;
3243         if (rc == -EBADR)
3244                 /*
3245                  * The request has in fact never been sent due to issues at
3246                  * a higher level (LOV).  Exit immediately since the caller
3247                  * is aware of the problem and takes care of the clean up.
3248                  */
3249                 RETURN(rc);
3250
3251         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3252             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3253                 GOTO(out, rc = 0);
3254
3255         if (rc != 0)
3256                 GOTO(out, rc);
3257
3258         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3259         if (msfs == NULL)
3260                 GOTO(out, rc = -EPROTO);
3261
3262         *aa->aa_oi->oi_osfs = *msfs;
3263 out:
3264         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3265
3266         RETURN(rc);
3267 }
3268
3269 static int osc_statfs_async(struct obd_export *exp,
3270                             struct obd_info *oinfo, time64_t max_age,
3271                             struct ptlrpc_request_set *rqset)
3272 {
3273         struct obd_device     *obd = class_exp2obd(exp);
3274         struct ptlrpc_request *req;
3275         struct osc_async_args *aa;
3276         int rc;
3277         ENTRY;
3278
3279         if (obd->obd_osfs_age >= max_age) {
3280                 CDEBUG(D_SUPER,
3281                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3282                        obd->obd_name, &obd->obd_osfs,
3283                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3284                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3285                 spin_lock(&obd->obd_osfs_lock);
3286                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3287                 spin_unlock(&obd->obd_osfs_lock);
3288                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3289                 if (oinfo->oi_cb_up)
3290                         oinfo->oi_cb_up(oinfo, 0);
3291
3292                 RETURN(0);
3293         }
3294
3295         /* We could possibly pass max_age in the request (as an absolute
3296          * timestamp or a "seconds.usec ago") so the target can avoid doing
3297          * extra calls into the filesystem if that isn't necessary (e.g.
3298          * during mount that would help a bit).  Having relative timestamps
3299          * is not so great if request processing is slow, while absolute
3300          * timestamps are not ideal because they need time synchronization. */
3301         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3302         if (req == NULL)
3303                 RETURN(-ENOMEM);
3304
3305         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3306         if (rc) {
3307                 ptlrpc_request_free(req);
3308                 RETURN(rc);
3309         }
3310         ptlrpc_request_set_replen(req);
3311         req->rq_request_portal = OST_CREATE_PORTAL;
3312         ptlrpc_at_set_req_timeout(req);
3313
3314         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3315                 /* procfs requests not want stat in wait for avoid deadlock */
3316                 req->rq_no_resend = 1;
3317                 req->rq_no_delay = 1;
3318         }
3319
3320         req->rq_interpret_reply = osc_statfs_interpret;
3321         aa = ptlrpc_req_async_args(aa, req);
3322         aa->aa_oi = oinfo;
3323
3324         ptlrpc_set_add_req(rqset, req);
3325         RETURN(0);
3326 }
3327
3328 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3329                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3330 {
3331         struct obd_device     *obd = class_exp2obd(exp);
3332         struct obd_statfs     *msfs;
3333         struct ptlrpc_request *req;
3334         struct obd_import     *imp, *imp0;
3335         int rc;
3336         ENTRY;
3337
3338         /*Since the request might also come from lprocfs, so we need
3339          *sync this with client_disconnect_export Bug15684
3340          */
3341         with_imp_locked(obd, imp0, rc)
3342                 imp = class_import_get(imp0);
3343         if (rc)
3344                 RETURN(rc);
3345
3346         /* We could possibly pass max_age in the request (as an absolute
3347          * timestamp or a "seconds.usec ago") so the target can avoid doing
3348          * extra calls into the filesystem if that isn't necessary (e.g.
3349          * during mount that would help a bit).  Having relative timestamps
3350          * is not so great if request processing is slow, while absolute
3351          * timestamps are not ideal because they need time synchronization. */
3352         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3353
3354         class_import_put(imp);
3355
3356         if (req == NULL)
3357                 RETURN(-ENOMEM);
3358
3359         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3360         if (rc) {
3361                 ptlrpc_request_free(req);
3362                 RETURN(rc);
3363         }
3364         ptlrpc_request_set_replen(req);
3365         req->rq_request_portal = OST_CREATE_PORTAL;
3366         ptlrpc_at_set_req_timeout(req);
3367
3368         if (flags & OBD_STATFS_NODELAY) {
3369                 /* procfs requests not want stat in wait for avoid deadlock */
3370                 req->rq_no_resend = 1;
3371                 req->rq_no_delay = 1;
3372         }
3373
3374         rc = ptlrpc_queue_wait(req);
3375         if (rc)
3376                 GOTO(out, rc);
3377
3378         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3379         if (msfs == NULL)
3380                 GOTO(out, rc = -EPROTO);
3381
3382         *osfs = *msfs;
3383
3384         EXIT;
3385 out:
3386         ptlrpc_req_finished(req);
3387         return rc;
3388 }
3389
3390 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3391                          void *karg, void __user *uarg)
3392 {
3393         struct obd_device *obd = exp->exp_obd;
3394         struct obd_ioctl_data *data;
3395         int rc;
3396
3397         ENTRY;
3398         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
3399                obd->obd_name, cmd, len, karg, uarg);
3400
3401         if (!try_module_get(THIS_MODULE)) {
3402                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3403                        module_name(THIS_MODULE));
3404                 RETURN(-EINVAL);
3405         }
3406
3407         switch (cmd) {
3408         case OBD_IOC_CLIENT_RECOVER:
3409                 if (unlikely(karg == NULL)) {
3410                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3411                                       rc = -EINVAL);
3412                         break;
3413                 }
3414                 data = karg;
3415                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3416                                            data->ioc_inlbuf1, 0);
3417                 if (rc > 0)
3418                         rc = 0;
3419                 break;
3420         case OBD_IOC_GETATTR:
3421                 if (unlikely(karg == NULL)) {
3422                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3423                                       rc = -EINVAL);
3424                         break;
3425                 }
3426                 data = karg;
3427                 rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
3428                 break;
3429 #ifdef IOC_OSC_SET_ACTIVE
3430         case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
3431 #endif
3432         case OBD_IOC_SET_ACTIVE:
3433                 if (unlikely(karg == NULL)) {
3434                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3435                                       rc = -EINVAL);
3436                         break;
3437                 }
3438                 data = karg;
3439                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3440                                               data->ioc_offset);
3441                 break;
3442         default:
3443                 rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized",
3444                                    -ENOTTY);
3445                 break;
3446         }
3447
3448         module_put(THIS_MODULE);
3449         return rc;
3450 }
3451
3452 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3453                        u32 keylen, void *key, u32 vallen, void *val,
3454                        struct ptlrpc_request_set *set)
3455 {
3456         struct ptlrpc_request *req;
3457         struct obd_device *obd = exp->exp_obd;
3458         struct obd_import *imp = class_exp2cliimp(exp);
3459         char *tmp;
3460         int rc;
3461         ENTRY;
3462
3463         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3464
3465         if (KEY_IS(KEY_CHECKSUM)) {
3466                 if (vallen != sizeof(int))
3467                         RETURN(-EINVAL);
3468                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3469                 RETURN(0);
3470         }
3471
3472         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3473                 sptlrpc_conf_client_adapt(obd);
3474                 RETURN(0);
3475         }
3476
3477         if (KEY_IS(KEY_FLUSH_CTX)) {
3478                 sptlrpc_import_flush_my_ctx(imp);
3479                 RETURN(0);
3480         }
3481
3482         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3483                 struct client_obd *cli = &obd->u.cli;
3484                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3485                 long target = *(long *)val;
3486
3487                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3488                 *(long *)val -= nr;
3489                 RETURN(0);
3490         }
3491
3492         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3493                 RETURN(-EINVAL);
3494
3495         /*
3496          * We pass all other commands directly to OST. Since nobody calls osc
3497          * methods directly and everybody is supposed to go through LOV, we
3498          * assume lov checked invalid values for us.
3499          * The only recognised values so far are evict_by_nid and mds_conn.
3500          * Even if something bad goes through, we'd get a -EINVAL from OST
3501          * anyway.
3502          */
3503
3504         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3505                                                 &RQF_OST_SET_GRANT_INFO :
3506                                                 &RQF_OBD_SET_INFO);
3507         if (req == NULL)
3508                 RETURN(-ENOMEM);
3509
3510         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3511                              RCL_CLIENT, keylen);
3512         if (!KEY_IS(KEY_GRANT_SHRINK))
3513                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3514                                      RCL_CLIENT, vallen);
3515         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3516         if (rc) {
3517                 ptlrpc_request_free(req);
3518                 RETURN(rc);
3519         }
3520
3521         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3522         memcpy(tmp, key, keylen);
3523         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3524                                                         &RMF_OST_BODY :
3525                                                         &RMF_SETINFO_VAL);
3526         memcpy(tmp, val, vallen);
3527
3528         if (KEY_IS(KEY_GRANT_SHRINK)) {
3529                 struct osc_grant_args *aa;
3530                 struct obdo *oa;
3531
3532                 aa = ptlrpc_req_async_args(aa, req);
3533                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3534                 if (!oa) {
3535                         ptlrpc_req_finished(req);
3536                         RETURN(-ENOMEM);
3537                 }
3538                 *oa = ((struct ost_body *)val)->oa;
3539                 aa->aa_oa = oa;
3540                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3541         }
3542
3543         ptlrpc_request_set_replen(req);
3544         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3545                 LASSERT(set != NULL);
3546                 ptlrpc_set_add_req(set, req);
3547                 ptlrpc_check_set(NULL, set);
3548         } else {
3549                 ptlrpcd_add_req(req);
3550         }
3551
3552         RETURN(0);
3553 }
3554 EXPORT_SYMBOL(osc_set_info_async);
3555
3556 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3557                   struct obd_device *obd, struct obd_uuid *cluuid,
3558                   struct obd_connect_data *data, void *localdata)
3559 {
3560         struct client_obd *cli = &obd->u.cli;
3561
3562         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3563                 long lost_grant;
3564                 long grant;
3565
3566                 spin_lock(&cli->cl_loi_list_lock);
3567                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3568                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3569                         /* restore ocd_grant_blkbits as client page bits */
3570                         data->ocd_grant_blkbits = PAGE_SHIFT;
3571                         grant += cli->cl_dirty_grant;
3572                 } else {
3573                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3574                 }
3575                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3576                 lost_grant = cli->cl_lost_grant;
3577                 cli->cl_lost_grant = 0;
3578                 spin_unlock(&cli->cl_loi_list_lock);
3579
3580                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3581                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3582                        data->ocd_version, data->ocd_grant, lost_grant);
3583         }
3584
3585         RETURN(0);
3586 }
3587 EXPORT_SYMBOL(osc_reconnect);
3588
3589 int osc_disconnect(struct obd_export *exp)
3590 {
3591         struct obd_device *obd = class_exp2obd(exp);
3592         int rc;
3593
3594         rc = client_disconnect_export(exp);
3595         /**
3596          * Initially we put del_shrink_grant before disconnect_export, but it
3597          * causes the following problem if setup (connect) and cleanup
3598          * (disconnect) are tangled together.
3599          *      connect p1                     disconnect p2
3600          *   ptlrpc_connect_import
3601          *     ...............               class_manual_cleanup
3602          *                                     osc_disconnect
3603          *                                     del_shrink_grant
3604          *   ptlrpc_connect_interrupt
3605          *     osc_init_grant
3606          *   add this client to shrink list
3607          *                                      cleanup_osc
3608          * Bang! grant shrink thread trigger the shrink. BUG18662
3609          */
3610         osc_del_grant_list(&obd->u.cli);
3611         return rc;
3612 }
3613 EXPORT_SYMBOL(osc_disconnect);
3614
3615 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3616                                  struct hlist_node *hnode, void *arg)
3617 {
3618         struct lu_env *env = arg;
3619         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3620         struct ldlm_lock *lock;
3621         struct osc_object *osc = NULL;
3622         ENTRY;
3623
3624         lock_res(res);
3625         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3626                 if (lock->l_ast_data != NULL && osc == NULL) {
3627                         osc = lock->l_ast_data;
3628                         cl_object_get(osc2cl(osc));
3629                 }
3630
3631                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3632                  * by the 2nd round of ldlm_namespace_clean() call in
3633                  * osc_import_event(). */
3634                 ldlm_clear_cleaned(lock);
3635         }
3636         unlock_res(res);
3637
3638         if (osc != NULL) {
3639                 osc_object_invalidate(env, osc);
3640                 cl_object_put(env, osc2cl(osc));
3641         }
3642
3643         RETURN(0);
3644 }
3645 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3646
3647 static int osc_import_event(struct obd_device *obd, struct obd_import *imp,
3648                             enum obd_import_event event)
3649 {
3650         struct client_obd *cli;
3651         int rc = 0;
3652
3653         ENTRY;
3654         if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
3655                 RETURN(-ENODEV);
3656
3657         switch (event) {
3658         case IMP_EVENT_DISCON: {
3659                 cli = &obd->u.cli;
3660                 if (!cli)
3661                         RETURN(-ENODEV);
3662                 spin_lock(&cli->cl_loi_list_lock);
3663                 cli->cl_avail_grant = 0;
3664                 cli->cl_lost_grant = 0;
3665                 spin_unlock(&cli->cl_loi_list_lock);
3666                 break;
3667         }
3668         case IMP_EVENT_INACTIVE: {
3669                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3670                 break;
3671         }
3672         case IMP_EVENT_INVALIDATE: {
3673                 struct ldlm_namespace *ns = obd->obd_namespace;
3674                 struct lu_env *env;
3675                 __u16 refcheck;
3676
3677                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3678
3679                 env = cl_env_get(&refcheck);
3680                 if (!IS_ERR(env)) {
3681                         osc_io_unplug(env, &obd->u.cli, NULL);
3682
3683                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3684                                                  osc_ldlm_resource_invalidate,
3685                                                  env, 0);
3686                         cl_env_put(env, &refcheck);
3687
3688                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3689                 } else {
3690                         rc = PTR_ERR(env);
3691                 }
3692                 break;
3693         }
3694         case IMP_EVENT_ACTIVE: {
3695                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3696                 break;
3697         }
3698         case IMP_EVENT_OCD: {
3699                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3700
3701                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3702                         osc_init_grant(&obd->u.cli, ocd);
3703
3704                 /* See bug 7198 */
3705                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3706                         imp->imp_client->cli_request_portal =
3707                                 OST_REQUEST_PORTAL;
3708
3709                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3710                 break;
3711         }
3712         case IMP_EVENT_DEACTIVATE: {
3713                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3714                 break;
3715         }
3716         case IMP_EVENT_ACTIVATE: {
3717                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3718                 break;
3719         }
3720         default:
3721                 CERROR("%s: Unknown import event %d: rc = %d\n",
3722                        obd->obd_name, event, -EINVAL);
3723                 LBUG();
3724         }
3725         RETURN(rc);
3726 }
3727
3728 /**
3729  * Determine whether the lock can be canceled before replaying the lock
3730  * during recovery, see bug16774 for detailed information.
3731  *
3732  * \retval zero the lock can't be canceled
3733  * \retval other ok to cancel
3734  */
3735 static int osc_cancel_weight(struct ldlm_lock *lock)
3736 {
3737         /*
3738          * Cancel all unused and granted extent lock.
3739          */
3740         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3741             ldlm_is_granted(lock) &&
3742             osc_ldlm_weigh_ast(lock) == 0)
3743                 RETURN(1);
3744
3745         RETURN(0);
3746 }
3747
3748 static int brw_queue_work(const struct lu_env *env, void *data)
3749 {
3750         struct client_obd *cli = data;
3751
3752         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3753
3754         osc_io_unplug(env, cli, NULL);
3755         RETURN(0);
3756 }
3757
3758 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3759 {
3760         struct client_obd *cli = &obd->u.cli;
3761         void *handler;
3762         int rc;
3763
3764         ENTRY;
3765
3766         rc = ptlrpcd_addref();
3767         if (rc)
3768                 RETURN(rc);
3769
3770         rc = client_obd_setup(obd, lcfg);
3771         if (rc)
3772                 GOTO(out_ptlrpcd, rc);
3773
3774
3775         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3776         if (IS_ERR(handler))
3777                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3778         cli->cl_writeback_work = handler;
3779
3780         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3781         if (IS_ERR(handler))
3782                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3783         cli->cl_lru_work = handler;
3784
3785         rc = osc_quota_setup(obd);
3786         if (rc)
3787                 GOTO(out_ptlrpcd_work, rc);
3788
3789         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3790         cli->cl_root_squash = 0;
3791         osc_update_next_shrink(cli);
3792
3793         RETURN(rc);
3794
3795 out_ptlrpcd_work:
3796         if (cli->cl_writeback_work != NULL) {
3797                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3798                 cli->cl_writeback_work = NULL;
3799         }
3800         if (cli->cl_lru_work != NULL) {
3801                 ptlrpcd_destroy_work(cli->cl_lru_work);
3802                 cli->cl_lru_work = NULL;
3803         }
3804         client_obd_cleanup(obd);
3805 out_ptlrpcd:
3806         ptlrpcd_decref();
3807         RETURN(rc);
3808 }
3809 EXPORT_SYMBOL(osc_setup_common);
3810
3811 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3812 {
3813         struct client_obd *cli = &obd->u.cli;
3814         int                adding;
3815         int                added;
3816         int                req_count;
3817         int                rc;
3818
3819         ENTRY;
3820
3821         rc = osc_setup_common(obd, lcfg);
3822         if (rc < 0)
3823                 RETURN(rc);
3824
3825         rc = osc_tunables_init(obd);
3826         if (rc)
3827                 RETURN(rc);
3828
3829         /*
3830          * We try to control the total number of requests with a upper limit
3831          * osc_reqpool_maxreqcount. There might be some race which will cause
3832          * over-limit allocation, but it is fine.
3833          */
3834         req_count = atomic_read(&osc_pool_req_count);
3835         if (req_count < osc_reqpool_maxreqcount) {
3836                 adding = cli->cl_max_rpcs_in_flight + 2;
3837                 if (req_count + adding > osc_reqpool_maxreqcount)
3838                         adding = osc_reqpool_maxreqcount - req_count;
3839
3840                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3841                 atomic_add(added, &osc_pool_req_count);
3842         }
3843
3844         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3845
3846         spin_lock(&osc_shrink_lock);
3847         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3848         spin_unlock(&osc_shrink_lock);
3849         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3850         cli->cl_import->imp_idle_debug = D_HA;
3851
3852         RETURN(0);
3853 }
3854
3855 int osc_precleanup_common(struct obd_device *obd)
3856 {
3857         struct client_obd *cli = &obd->u.cli;
3858         ENTRY;
3859
3860         /* LU-464
3861          * for echo client, export may be on zombie list, wait for
3862          * zombie thread to cull it, because cli.cl_import will be
3863          * cleared in client_disconnect_export():
3864          *   class_export_destroy() -> obd_cleanup() ->
3865          *   echo_device_free() -> echo_client_cleanup() ->
3866          *   obd_disconnect() -> osc_disconnect() ->
3867          *   client_disconnect_export()
3868          */
3869         obd_zombie_barrier();
3870         if (cli->cl_writeback_work) {
3871                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3872                 cli->cl_writeback_work = NULL;
3873         }
3874
3875         if (cli->cl_lru_work) {
3876                 ptlrpcd_destroy_work(cli->cl_lru_work);
3877                 cli->cl_lru_work = NULL;
3878         }
3879
3880         obd_cleanup_client_import(obd);
3881         RETURN(0);
3882 }
3883 EXPORT_SYMBOL(osc_precleanup_common);
3884
3885 static int osc_precleanup(struct obd_device *obd)
3886 {
3887         ENTRY;
3888
3889         osc_precleanup_common(obd);
3890
3891         ptlrpc_lprocfs_unregister_obd(obd);
3892         RETURN(0);
3893 }
3894
3895 int osc_cleanup_common(struct obd_device *obd)
3896 {
3897         struct client_obd *cli = &obd->u.cli;
3898         int rc;
3899
3900         ENTRY;
3901
3902         spin_lock(&osc_shrink_lock);
3903         list_del(&cli->cl_shrink_list);
3904         spin_unlock(&osc_shrink_lock);
3905
3906         /* lru cleanup */
3907         if (cli->cl_cache != NULL) {
3908                 LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
3909                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3910                 list_del_init(&cli->cl_lru_osc);
3911                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3912                 cli->cl_lru_left = NULL;
3913                 cl_cache_decref(cli->cl_cache);
3914                 cli->cl_cache = NULL;
3915         }
3916
3917         /* free memory of osc quota cache */
3918         osc_quota_cleanup(obd);
3919
3920         rc = client_obd_cleanup(obd);
3921
3922         ptlrpcd_decref();
3923         RETURN(rc);
3924 }
3925 EXPORT_SYMBOL(osc_cleanup_common);
3926
3927 static const struct obd_ops osc_obd_ops = {
3928         .o_owner                = THIS_MODULE,
3929         .o_setup                = osc_setup,
3930         .o_precleanup           = osc_precleanup,
3931         .o_cleanup              = osc_cleanup_common,
3932         .o_add_conn             = client_import_add_conn,
3933         .o_del_conn             = client_import_del_conn,
3934         .o_connect              = client_connect_import,
3935         .o_reconnect            = osc_reconnect,
3936         .o_disconnect           = osc_disconnect,
3937         .o_statfs               = osc_statfs,
3938         .o_statfs_async         = osc_statfs_async,
3939         .o_create               = osc_create,
3940         .o_destroy              = osc_destroy,
3941         .o_getattr              = osc_getattr,
3942         .o_setattr              = osc_setattr,
3943         .o_iocontrol            = osc_iocontrol,
3944         .o_set_info_async       = osc_set_info_async,
3945         .o_import_event         = osc_import_event,
3946         .o_quotactl             = osc_quotactl,
3947 };
3948
3949 LIST_HEAD(osc_shrink_list);
3950 DEFINE_SPINLOCK(osc_shrink_lock);
3951 bool osc_page_cache_shrink_enabled = true;
3952
3953 #ifdef HAVE_SHRINKER_COUNT
3954 static struct ll_shrinker_ops osc_cache_sh_ops = {
3955         .count_objects  = osc_cache_shrink_count,
3956         .scan_objects   = osc_cache_shrink_scan,
3957         .seeks          = DEFAULT_SEEKS,
3958 };
3959 #else
3960 static int osc_cache_shrink(struct shrinker *shrinker,
3961                             struct shrink_control *sc)
3962 {
3963         (void)osc_cache_shrink_scan(shrinker, sc);
3964
3965         return osc_cache_shrink_count(shrinker, sc);
3966 }
3967
3968 static struct ll_shrinker_ops osc_cache_sh_ops = {
3969         .shrink   = osc_cache_shrink,
3970         .seeks    = DEFAULT_SEEKS,
3971 };
3972 #endif
3973
3974 static struct shrinker *osc_cache_shrinker;
3975
3976 static int __init osc_init(void)
3977 {
3978         unsigned int reqpool_size;
3979         unsigned int reqsize;
3980         int rc;
3981         ENTRY;
3982
3983         /* print an address of _any_ initialized kernel symbol from this
3984          * module, to allow debugging with gdb that doesn't support data
3985          * symbols from modules.*/
3986         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3987
3988         rc = libcfs_setup();
3989         if (rc)
3990                 return rc;
3991
3992         rc = lu_kmem_init(osc_caches);
3993         if (rc)
3994                 RETURN(rc);
3995
3996         osc_cache_shrinker = ll_shrinker_create(&osc_cache_sh_ops, 0,
3997                                                 "osc_cache");
3998         if (IS_ERR(osc_cache_shrinker))
3999                 GOTO(out_kmem, rc = PTR_ERR(osc_cache_shrinker));
4000
4001         /* This is obviously too much memory, only prevent overflow here */
4002         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
4003                 GOTO(out_shrinker, rc = -EINVAL);
4004
4005         reqpool_size = osc_reqpool_mem_max << 20;
4006
4007         reqsize = 1;
4008         while (reqsize < OST_IO_MAXREQSIZE)
4009                 reqsize = reqsize << 1;
4010
4011         /*
4012          * We don't enlarge the request count in OSC pool according to
4013          * cl_max_rpcs_in_flight. The allocation from the pool will only be
4014          * tried after normal allocation failed. So a small OSC pool won't
4015          * cause much performance degression in most of cases.
4016          */
4017         osc_reqpool_maxreqcount = reqpool_size / reqsize;
4018
4019         atomic_set(&osc_pool_req_count, 0);
4020         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
4021                                           ptlrpc_add_rqs_to_pool);
4022
4023         if (osc_rq_pool == NULL)
4024                 GOTO(out_shrinker, rc = -ENOMEM);
4025
4026         rc = osc_start_grant_work();
4027         if (rc != 0)
4028                 GOTO(out_req_pool, rc);
4029
4030         rc = class_register_type(&osc_obd_ops, NULL, true,
4031                                  LUSTRE_OSC_NAME, &osc_device_type);
4032         if (rc < 0)
4033                 GOTO(out_stop_grant, rc);
4034
4035         RETURN(rc);
4036
4037 out_stop_grant:
4038         osc_stop_grant_work();
4039 out_req_pool:
4040         ptlrpc_free_rq_pool(osc_rq_pool);
4041 out_shrinker:
4042         shrinker_free(osc_cache_shrinker);
4043 out_kmem:
4044         lu_kmem_fini(osc_caches);
4045
4046         RETURN(rc);
4047 }
4048
4049 static void __exit osc_exit(void)
4050 {
4051         class_unregister_type(LUSTRE_OSC_NAME);
4052         ptlrpc_free_rq_pool(osc_rq_pool);
4053         osc_stop_grant_work();
4054         shrinker_free(osc_cache_shrinker);
4055         lu_kmem_fini(osc_caches);
4056 }
4057
4058 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4059 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4060 MODULE_VERSION(LUSTRE_VERSION_STRING);
4061 MODULE_LICENSE("GPL");
4062
4063 module_init(osc_init);
4064 module_exit(osc_exit);