Whamcloud - gitweb
LU-16713 llite: writeback/commit pages under memory pressure
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_ioctl_old.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <lustre_osc.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(osc_fallocate_base);
483
484 static int osc_sync_interpret(const struct lu_env *env,
485                               struct ptlrpc_request *req, void *args, int rc)
486 {
487         struct osc_fsync_args *fa = args;
488         struct ost_body *body;
489         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490         unsigned long valid = 0;
491         struct cl_object *obj;
492         ENTRY;
493
494         if (rc != 0)
495                 GOTO(out, rc);
496
497         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498         if (body == NULL) {
499                 CERROR("can't unpack ost_body\n");
500                 GOTO(out, rc = -EPROTO);
501         }
502
503         *fa->fa_oa = body->oa;
504         obj = osc2cl(fa->fa_obj);
505
506         /* Update osc object's blocks attribute */
507         cl_object_attr_lock(obj);
508         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509                 attr->cat_blocks = body->oa.o_blocks;
510                 valid |= CAT_BLOCKS;
511         }
512
513         if (valid != 0)
514                 cl_object_attr_update(env, obj, attr, valid);
515         cl_object_attr_unlock(obj);
516
517 out:
518         rc = fa->fa_upcall(fa->fa_cookie, rc);
519         RETURN(rc);
520 }
521
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523                   obd_enqueue_update_f upcall, void *cookie,
524                   struct ptlrpc_request_set *rqset)
525 {
526         struct obd_export     *exp = osc_export(obj);
527         struct ptlrpc_request *req;
528         struct ost_body       *body;
529         struct osc_fsync_args *fa;
530         int                    rc;
531         ENTRY;
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549         req->rq_interpret_reply = osc_sync_interpret;
550
551         fa = ptlrpc_req_async_args(fa, req);
552         fa->fa_obj = obj;
553         fa->fa_oa = oa;
554         fa->fa_upcall = upcall;
555         fa->fa_cookie = cookie;
556
557         ptlrpc_set_add_req(rqset, req);
558
559         RETURN (0);
560 }
561
562 /* Find and cancel locally locks matched by @mode in the resource found by
563  * @objid. Found locks are added into @cancel list. Returns the amount of
564  * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566                                    struct list_head *cancels,
567                                    enum ldlm_mode mode, __u64 lock_flags)
568 {
569         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570         struct ldlm_res_id res_id;
571         struct ldlm_resource *res;
572         int count;
573         ENTRY;
574
575         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576          * export) but disabled through procfs (flag in NS).
577          *
578          * This distinguishes from a case when ELC is not supported originally,
579          * when we still want to cancel locks in advance and just cancel them
580          * locally, without sending any RPC. */
581         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
582                 RETURN(0);
583
584         ostid_build_res_name(&oa->o_oi, &res_id);
585         res = ldlm_resource_get(ns, &res_id, 0, 0);
586         if (IS_ERR(res))
587                 RETURN(0);
588
589         LDLM_RESOURCE_ADDREF(res);
590         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591                                            lock_flags, 0, NULL);
592         LDLM_RESOURCE_DELREF(res);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(const struct lu_env *env,
598                                  struct ptlrpc_request *req, void *args, int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         wake_up(&cli->cl_destroy_waitq);
604
605         return 0;
606 }
607
608 static int osc_can_send_destroy(struct client_obd *cli)
609 {
610         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611             cli->cl_max_rpcs_in_flight) {
612                 /* The destroy request can be sent */
613                 return 1;
614         }
615         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616             cli->cl_max_rpcs_in_flight) {
617                 /*
618                  * The counter has been modified between the two atomic
619                  * operations.
620                  */
621                 wake_up(&cli->cl_destroy_waitq);
622         }
623         return 0;
624 }
625
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
627                        struct obdo *oa)
628 {
629         struct client_obd     *cli = &exp->exp_obd->u.cli;
630         struct ptlrpc_request *req;
631         struct ost_body       *body;
632         LIST_HEAD(cancels);
633         int rc, count;
634         ENTRY;
635
636         if (!oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642                                         LDLM_FL_DISCARD_DATA);
643
644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645         if (req == NULL) {
646                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
647                 RETURN(-ENOMEM);
648         }
649
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661         LASSERT(body);
662         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663
664         ptlrpc_request_set_replen(req);
665
666         req->rq_interpret_reply = osc_destroy_interpret;
667         if (!osc_can_send_destroy(cli)) {
668                 /*
669                  * Wait until the number of on-going destroy RPCs drops
670                  * under max_rpc_in_flight
671                  */
672                 rc = l_wait_event_abortable_exclusive(
673                         cli->cl_destroy_waitq,
674                         osc_can_send_destroy(cli));
675                 if (rc) {
676                         ptlrpc_req_finished(req);
677                         RETURN(-EINTR);
678                 }
679         }
680
681         /* Do not wait for response */
682         ptlrpcd_add_req(req);
683         RETURN(0);
684 }
685
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
687                                 long writing_bytes)
688 {
689         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690
691         LASSERT(!(oa->o_valid & bits));
692
693         oa->o_valid |= bits;
694         spin_lock(&cli->cl_loi_list_lock);
695         if (cli->cl_ocd_grant_param)
696                 oa->o_dirty = cli->cl_dirty_grant;
697         else
698                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700                 CERROR("dirty %lu > dirty_max %lu\n",
701                        cli->cl_dirty_pages,
702                        cli->cl_dirty_max_pages);
703                 oa->o_undirty = 0;
704         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705                             (long)(obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
711                        obd_max_dirty_pages);
712                 oa->o_undirty = 0;
713         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714                             0x7fffffff)) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
717                 oa->o_undirty = 0;
718         } else {
719                 unsigned long nrpages;
720                 unsigned long undirty;
721
722                 nrpages = cli->cl_max_pages_per_rpc;
723                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725                 undirty = nrpages << PAGE_SHIFT;
726                 if (cli->cl_ocd_grant_param) {
727                         int nrextents;
728
729                         /* take extent tax into account when asking for more
730                          * grant space */
731                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732                                      cli->cl_max_extent_pages;
733                         undirty += nrextents * cli->cl_grant_extent_tax;
734                 }
735                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736                  * to add extent tax, etc.
737                  */
738                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740         }
741         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743         if (cli->cl_lost_grant > INT_MAX) {
744                 CDEBUG(D_CACHE,
745                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746                       cli_name(cli), cli->cl_lost_grant);
747                 oa->o_dropped = INT_MAX;
748         } else {
749                 oa->o_dropped = cli->cl_lost_grant;
750         }
751         cli->cl_lost_grant -= oa->o_dropped;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
756 }
757
758 void osc_update_next_shrink(struct client_obd *cli)
759 {
760         cli->cl_next_shrink_grant = ktime_get_seconds() +
761                                     cli->cl_grant_shrink_interval;
762
763         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764                cli->cl_next_shrink_grant);
765 }
766 EXPORT_SYMBOL(osc_update_next_shrink);
767
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
769 {
770         spin_lock(&cli->cl_loi_list_lock);
771         cli->cl_avail_grant += grant;
772         spin_unlock(&cli->cl_loi_list_lock);
773 }
774
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
776 {
777         if (body->oa.o_valid & OBD_MD_FLGRANT) {
778                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779                 __osc_update_grant(cli, body->oa.o_grant);
780         }
781 }
782
783 /**
784  * grant thread data for shrinking space.
785  */
786 struct grant_thread_data {
787         struct list_head        gtd_clients;
788         struct mutex            gtd_mutex;
789         unsigned long           gtd_stopped:1;
790 };
791 static struct grant_thread_data client_gtd;
792
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794                                       struct ptlrpc_request *req,
795                                       void *args, int rc)
796 {
797         struct osc_grant_args *aa = args;
798         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799         struct ost_body *body;
800
801         if (rc != 0) {
802                 __osc_update_grant(cli, aa->aa_oa->o_grant);
803                 GOTO(out, rc);
804         }
805
806         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
807         LASSERT(body);
808         osc_update_grant(cli, body);
809 out:
810         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
811         aa->aa_oa = NULL;
812
813         return rc;
814 }
815
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
817 {
818         spin_lock(&cli->cl_loi_list_lock);
819         oa->o_grant = cli->cl_avail_grant / 4;
820         cli->cl_avail_grant -= oa->o_grant;
821         spin_unlock(&cli->cl_loi_list_lock);
822         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823                 oa->o_valid |= OBD_MD_FLFLAGS;
824                 oa->o_flags = 0;
825         }
826         oa->o_flags |= OBD_FL_SHRINK_GRANT;
827         osc_update_next_shrink(cli);
828 }
829
830 /* Shrink the current grant, either from some large amount to enough for a
831  * full set of in-flight RPCs, or if we have already shrunk to that limit
832  * then to enough for a single RPC.  This avoids keeping more grant than
833  * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
835 {
836         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
838
839         spin_lock(&cli->cl_loi_list_lock);
840         if (cli->cl_avail_grant <= target_bytes)
841                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842         spin_unlock(&cli->cl_loi_list_lock);
843
844         return osc_shrink_grant_to_target(cli, target_bytes);
845 }
846
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
848 {
849         int                     rc = 0;
850         struct ost_body        *body;
851         ENTRY;
852
853         spin_lock(&cli->cl_loi_list_lock);
854         /* Don't shrink if we are already above or below the desired limit
855          * We don't want to shrink below a single RPC, as that will negatively
856          * impact block allocation and long-term performance. */
857         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860         if (target_bytes >= cli->cl_avail_grant) {
861                 spin_unlock(&cli->cl_loi_list_lock);
862                 RETURN(0);
863         }
864         spin_unlock(&cli->cl_loi_list_lock);
865
866         OBD_ALLOC_PTR(body);
867         if (!body)
868                 RETURN(-ENOMEM);
869
870         osc_announce_cached(cli, &body->oa, 0);
871
872         spin_lock(&cli->cl_loi_list_lock);
873         if (target_bytes >= cli->cl_avail_grant) {
874                 /* available grant has changed since target calculation */
875                 spin_unlock(&cli->cl_loi_list_lock);
876                 GOTO(out_free, rc = 0);
877         }
878         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879         cli->cl_avail_grant = target_bytes;
880         spin_unlock(&cli->cl_loi_list_lock);
881         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882                 body->oa.o_valid |= OBD_MD_FLFLAGS;
883                 body->oa.o_flags = 0;
884         }
885         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887
888         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890                                 sizeof(*body), body, NULL);
891         if (rc != 0)
892                 __osc_update_grant(cli, body->oa.o_grant);
893 out_free:
894         OBD_FREE_PTR(body);
895         RETURN(rc);
896 }
897
898 static int osc_should_shrink_grant(struct client_obd *client)
899 {
900         time64_t next_shrink = client->cl_next_shrink_grant;
901
902         if (client->cl_import == NULL)
903                 return 0;
904
905         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906             client->cl_import->imp_grant_shrink_disabled) {
907                 osc_update_next_shrink(client);
908                 return 0;
909         }
910
911         if (ktime_get_seconds() >= next_shrink - 5) {
912                 /* Get the current RPC size directly, instead of going via:
913                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914                  * Keep comment here so that it can be found by searching. */
915                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
916
917                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918                     client->cl_avail_grant > brw_size)
919                         return 1;
920                 else
921                         osc_update_next_shrink(client);
922         }
923         return 0;
924 }
925
926 #define GRANT_SHRINK_RPC_BATCH  100
927
928 static struct delayed_work work;
929
930 static void osc_grant_work_handler(struct work_struct *data)
931 {
932         struct client_obd *cli;
933         int rpc_sent;
934         bool init_next_shrink = true;
935         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
936
937         rpc_sent = 0;
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_for_each_entry(cli, &client_gtd.gtd_clients,
940                             cl_grant_chain) {
941                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942                     osc_should_shrink_grant(cli)) {
943                         osc_shrink_grant(cli);
944                         rpc_sent++;
945                 }
946
947                 if (!init_next_shrink) {
948                         if (cli->cl_next_shrink_grant < next_shrink &&
949                             cli->cl_next_shrink_grant > ktime_get_seconds())
950                                 next_shrink = cli->cl_next_shrink_grant;
951                 } else {
952                         init_next_shrink = false;
953                         next_shrink = cli->cl_next_shrink_grant;
954                 }
955         }
956         mutex_unlock(&client_gtd.gtd_mutex);
957
958         if (client_gtd.gtd_stopped == 1)
959                 return;
960
961         if (next_shrink > ktime_get_seconds()) {
962                 time64_t delay = next_shrink - ktime_get_seconds();
963
964                 schedule_delayed_work(&work, cfs_time_seconds(delay));
965         } else {
966                 schedule_work(&work.work);
967         }
968 }
969
970 void osc_schedule_grant_work(void)
971 {
972         cancel_delayed_work_sync(&work);
973         schedule_work(&work.work);
974 }
975 EXPORT_SYMBOL(osc_schedule_grant_work);
976
977 /**
978  * Start grant thread for returing grant to server for idle clients.
979  */
980 static int osc_start_grant_work(void)
981 {
982         client_gtd.gtd_stopped = 0;
983         mutex_init(&client_gtd.gtd_mutex);
984         INIT_LIST_HEAD(&client_gtd.gtd_clients);
985
986         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987         schedule_work(&work.work);
988
989         return 0;
990 }
991
992 static void osc_stop_grant_work(void)
993 {
994         client_gtd.gtd_stopped = 1;
995         cancel_delayed_work_sync(&work);
996 }
997
998 static void osc_add_grant_list(struct client_obd *client)
999 {
1000         mutex_lock(&client_gtd.gtd_mutex);
1001         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002         mutex_unlock(&client_gtd.gtd_mutex);
1003 }
1004
1005 static void osc_del_grant_list(struct client_obd *client)
1006 {
1007         if (list_empty(&client->cl_grant_chain))
1008                 return;
1009
1010         mutex_lock(&client_gtd.gtd_mutex);
1011         list_del_init(&client->cl_grant_chain);
1012         mutex_unlock(&client_gtd.gtd_mutex);
1013 }
1014
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1016 {
1017         /*
1018          * ocd_grant is the total grant amount we're expect to hold: if we've
1019          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1021          * dirty.
1022          *
1023          * race is tolerable here: if we're evicted, but imp_state already
1024          * left EVICTED state, then cl_dirty_pages must be 0 already.
1025          */
1026         spin_lock(&cli->cl_loi_list_lock);
1027         cli->cl_avail_grant = ocd->ocd_grant;
1028         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029                 unsigned long consumed = cli->cl_reserved_grant;
1030
1031                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032                         consumed += cli->cl_dirty_grant;
1033                 else
1034                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035                 if (cli->cl_avail_grant < consumed) {
1036                         CERROR("%s: granted %ld but already consumed %ld\n",
1037                                cli_name(cli), cli->cl_avail_grant, consumed);
1038                         cli->cl_avail_grant = 0;
1039                 } else {
1040                         cli->cl_avail_grant -= consumed;
1041                 }
1042         }
1043
1044         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1045                 u64 size;
1046                 int chunk_mask;
1047
1048                 /* overhead for each extent insertion */
1049                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050                 /* determine the appropriate chunk size used by osc_extent. */
1051                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052                                           ocd->ocd_grant_blkbits);
1053                 /* max_pages_per_rpc must be chunk aligned */
1054                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056                                              ~chunk_mask) & chunk_mask;
1057                 /* determine maximum extent size, in #pages */
1058                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060                 cli->cl_ocd_grant_param = 1;
1061         } else {
1062                 cli->cl_ocd_grant_param = 0;
1063                 cli->cl_grant_extent_tax = 0;
1064                 cli->cl_chunkbits = PAGE_SHIFT;
1065                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1066         }
1067         spin_unlock(&cli->cl_loi_list_lock);
1068
1069         CDEBUG(D_CACHE,
1070                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1071                cli_name(cli),
1072                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073                cli->cl_max_extent_pages);
1074
1075         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076                 osc_add_grant_list(cli);
1077 }
1078 EXPORT_SYMBOL(osc_init_grant);
1079
1080 /* We assume that the reason this OSC got a short read is because it read
1081  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083  * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085                               struct brw_page **pga)
1086 {
1087         char *ptr;
1088         int i = 0;
1089
1090         /* skip bytes read OK */
1091         while (nob_read > 0) {
1092                 LASSERT (page_count > 0);
1093
1094                 if (pga[i]->bp_count > nob_read) {
1095                         /* EOF inside this page */
1096                         ptr = kmap(pga[i]->bp_page) +
1097                                 (pga[i]->bp_off & ~PAGE_MASK);
1098                         memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read);
1099                         kunmap(pga[i]->bp_page);
1100                         page_count--;
1101                         i++;
1102                         break;
1103                 }
1104
1105                 nob_read -= pga[i]->bp_count;
1106                 page_count--;
1107                 i++;
1108         }
1109
1110         /* zero remaining pages */
1111         while (page_count-- > 0) {
1112                 ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK);
1113                 memset(ptr, 0, pga[i]->bp_count);
1114                 kunmap(pga[i]->bp_page);
1115                 i++;
1116         }
1117 }
1118
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120                            int requested_nob, int niocount,
1121                            size_t page_count, struct brw_page **pga)
1122 {
1123         int     i;
1124         __u32   *remote_rcs;
1125
1126         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127                                                   sizeof(*remote_rcs) *
1128                                                   niocount);
1129         if (remote_rcs == NULL) {
1130                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1131                 return(-EPROTO);
1132         }
1133
1134         /* return error if any niobuf was in error */
1135         for (i = 0; i < niocount; i++) {
1136                 if ((int)remote_rcs[i] < 0) {
1137                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138                                i, remote_rcs[i], req);
1139                         return remote_rcs[i];
1140                 }
1141
1142                 if (remote_rcs[i] != 0) {
1143                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144                                 i, remote_rcs[i], req);
1145                         return(-EPROTO);
1146                 }
1147         }
1148         if (req->rq_bulk != NULL &&
1149             req->rq_bulk->bd_nob_transferred != requested_nob) {
1150                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151                        req->rq_bulk->bd_nob_transferred, requested_nob);
1152                 return(-EPROTO);
1153         }
1154
1155         return (0);
1156 }
1157
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1159 {
1160         if (p1->bp_flag != p2->bp_flag) {
1161                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1163                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC |
1164                                   OBD_BRW_SYS_RESOURCE);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at https://jira.whamcloud.com/\n",
1171                               p1->bp_flag, p2->bp_flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->bp_off + p1->bp_count == p2->bp_off);
1177 }
1178
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181                                    size_t pg_count, struct brw_page **pga,
1182                                    int opc, obd_dif_csum_fn *fn,
1183                                    int sector_size,
1184                                    u32 *check_sum, bool resend)
1185 {
1186         struct ahash_request *req;
1187         /* Used Adler as the default checksum type on top of DIF tags */
1188         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189         struct page *__page;
1190         unsigned char *buffer;
1191         __be16 *guard_start;
1192         int guard_number;
1193         int used_number = 0;
1194         int used;
1195         u32 cksum;
1196         unsigned int bufsize = sizeof(cksum);
1197         int rc = 0, rc2;
1198         int i = 0;
1199
1200         LASSERT(pg_count > 0);
1201
1202         __page = alloc_page(GFP_KERNEL);
1203         if (__page == NULL)
1204                 return -ENOMEM;
1205
1206         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207         if (IS_ERR(req)) {
1208                 rc = PTR_ERR(req);
1209                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1211                 GOTO(out, rc);
1212         }
1213
1214         buffer = kmap(__page);
1215         guard_start = (__be16 *)buffer;
1216         guard_number = PAGE_SIZE / sizeof(*guard_start);
1217         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218                "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219                guard_number, resend, nob, pg_count);
1220
1221         while (nob > 0 && pg_count > 0) {
1222                 int off = pga[i]->bp_off & ~PAGE_MASK;
1223                 unsigned int count =
1224                         pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1225                 int guards_needed = DIV_ROUND_UP(off + count, sector_size) -
1226                                         (off / sector_size);
1227
1228                 if (guards_needed > guard_number - used_number) {
1229                         cfs_crypto_hash_update_page(req, __page, 0,
1230                                 used_number * sizeof(*guard_start));
1231                         used_number = 0;
1232                 }
1233
1234                 /* corrupt the data before we compute the checksum, to
1235                  * simulate an OST->client data error */
1236                 if (unlikely(i == 0 && opc == OST_READ &&
1237                              CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1238                         unsigned char *ptr = kmap(pga[i]->bp_page);
1239
1240                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1241                         kunmap(pga[i]->bp_page);
1242                 }
1243
1244                 /*
1245                  * The left guard number should be able to hold checksums of a
1246                  * whole page
1247                  */
1248                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page,
1249                                                   pga[i]->bp_off & ~PAGE_MASK,
1250                                                   count,
1251                                                   guard_start + used_number,
1252                                                   guard_number - used_number,
1253                                                   &used, sector_size,
1254                                                   fn);
1255                 if (unlikely(resend))
1256                         CDEBUG(D_PAGE | D_HA,
1257                                "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1258                                i, used, pga[i]->bp_off & ~PAGE_MASK, count,
1259                                (int)(used * sizeof(*guard_start)),
1260                                guard_start + used_number);
1261                 if (rc)
1262                         break;
1263
1264                 used_number += used;
1265                 nob -= pga[i]->bp_count;
1266                 pg_count--;
1267                 i++;
1268         }
1269         kunmap(__page);
1270         if (rc)
1271                 GOTO(out_hash, rc);
1272
1273         if (used_number != 0)
1274                 cfs_crypto_hash_update_page(req, __page, 0,
1275                         used_number * sizeof(*guard_start));
1276
1277 out_hash:
1278         rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1279         if (!rc)
1280                 rc = rc2;
1281         if (rc == 0) {
1282                 /* For sending we only compute the wrong checksum instead
1283                  * of corrupting the data so it is still correct on a redo */
1284                 if (opc == OST_WRITE &&
1285                                 CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1286                         cksum++;
1287
1288                 *check_sum = cksum;
1289         }
1290 out:
1291         __free_page(__page);
1292         return rc;
1293 }
1294 #else /* !CONFIG_CRC_T10DIF */
1295 #define obd_dif_ip_fn NULL
1296 #define obd_dif_crc_fn NULL
1297 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1298         -EOPNOTSUPP
1299 #endif /* CONFIG_CRC_T10DIF */
1300
1301 static int osc_checksum_bulk(int nob, size_t pg_count,
1302                              struct brw_page **pga, int opc,
1303                              enum cksum_types cksum_type,
1304                              u32 *cksum)
1305 {
1306         int                             i = 0;
1307         struct ahash_request           *req;
1308         unsigned int                    bufsize;
1309         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1310
1311         LASSERT(pg_count > 0);
1312
1313         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1314         if (IS_ERR(req)) {
1315                 CERROR("Unable to initialize checksum hash %s\n",
1316                        cfs_crypto_hash_name(cfs_alg));
1317                 return PTR_ERR(req);
1318         }
1319
1320         while (nob > 0 && pg_count > 0) {
1321                 unsigned int count =
1322                         pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1323
1324                 /* corrupt the data before we compute the checksum, to
1325                  * simulate an OST->client data error */
1326                 if (i == 0 && opc == OST_READ &&
1327                     CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1328                         unsigned char *ptr = kmap(pga[i]->bp_page);
1329                         int off = pga[i]->bp_off & ~PAGE_MASK;
1330
1331                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1332                         kunmap(pga[i]->bp_page);
1333                 }
1334                 cfs_crypto_hash_update_page(req, pga[i]->bp_page,
1335                                             pga[i]->bp_off & ~PAGE_MASK,
1336                                             count);
1337                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n",
1338                                (int)(pga[i]->bp_off & ~PAGE_MASK));
1339
1340                 nob -= pga[i]->bp_count;
1341                 pg_count--;
1342                 i++;
1343         }
1344
1345         bufsize = sizeof(*cksum);
1346         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1347
1348         /* For sending we only compute the wrong checksum instead
1349          * of corrupting the data so it is still correct on a redo */
1350         if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1351                 (*cksum)++;
1352
1353         return 0;
1354 }
1355
1356 static int osc_checksum_bulk_rw(const char *obd_name,
1357                                 enum cksum_types cksum_type,
1358                                 int nob, size_t pg_count,
1359                                 struct brw_page **pga, int opc,
1360                                 u32 *check_sum, bool resend)
1361 {
1362         obd_dif_csum_fn *fn = NULL;
1363         int sector_size = 0;
1364         int rc;
1365
1366         ENTRY;
1367         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1368
1369         if (fn)
1370                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1371                                              opc, fn, sector_size, check_sum,
1372                                              resend);
1373         else
1374                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1375                                        check_sum);
1376
1377         RETURN(rc);
1378 }
1379
1380 #ifdef CONFIG_LL_ENCRYPTION
1381 /**
1382  * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
1383  * @srcpage:      The locked pagecache page containing the block(s) to encrypt
1384  * @dstpage:      The page to put encryption result
1385  * @len:       Total size of the block(s) to encrypt.  Must be a nonzero
1386  *              multiple of the filesystem's block size.
1387  * @offs:      Byte offset within @page of the first block to encrypt.  Must be
1388  *              a multiple of the filesystem's block size.
1389  * @gfp_flags: Memory allocation flags
1390  *
1391  * This overlay function is necessary to be able to provide our own bounce page.
1392  */
1393 static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
1394                                                  struct page *dstpage,
1395                                                  unsigned int len,
1396                                                  unsigned int offs,
1397                                                  gfp_t gfp_flags)
1398
1399 {
1400         const struct inode *inode = srcpage->mapping->host;
1401         const unsigned int blockbits = inode->i_blkbits;
1402         const unsigned int blocksize = 1 << blockbits;
1403         u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
1404                 (offs >> blockbits);
1405         unsigned int i;
1406         int err;
1407
1408         if (unlikely(!dstpage))
1409                 return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
1410                                                         gfp_flags);
1411
1412         if (WARN_ON_ONCE(!PageLocked(srcpage)))
1413                 return ERR_PTR(-EINVAL);
1414
1415         if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
1416                 return ERR_PTR(-EINVAL);
1417
1418         /* Set PagePrivate2 for disambiguation in
1419          * osc_finalize_bounce_page().
1420          * It means cipher page was not allocated by llcrypt.
1421          */
1422         SetPagePrivate2(dstpage);
1423
1424         for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
1425                 err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
1426                                             i, lblk_num, gfp_flags);
1427                 if (err)
1428                         return ERR_PTR(err);
1429         }
1430         SetPagePrivate(dstpage);
1431         set_page_private(dstpage, (unsigned long)srcpage);
1432         return dstpage;
1433 }
1434
1435 /**
1436  * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
1437  *
1438  * This overlay function is necessary to handle bounce pages
1439  * allocated by ourselves.
1440  */
1441 static inline void osc_finalize_bounce_page(struct page **pagep)
1442 {
1443         struct page *page = *pagep;
1444
1445         /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
1446          * to indicate the cipher page was allocated by ourselves.
1447          * So we must not free it via llcrypt.
1448          */
1449         if (unlikely(!page || !PagePrivate2(page)))
1450                 return llcrypt_finalize_bounce_page(pagep);
1451
1452         if (llcrypt_is_bounce_page(page)) {
1453                 *pagep = llcrypt_pagecache_page(page);
1454                 ClearPagePrivate2(page);
1455                 set_page_private(page, (unsigned long)NULL);
1456                 ClearPagePrivate(page);
1457         }
1458 }
1459 #else /* !CONFIG_LL_ENCRYPTION */
1460 #define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
1461         llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
1462 #define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
1463 #endif
1464
1465 static inline void osc_release_bounce_pages(struct brw_page **pga,
1466                                             u32 page_count)
1467 {
1468 #ifdef HAVE_LUSTRE_CRYPTO
1469         struct page **pa = NULL;
1470         int i, j = 0;
1471
1472         if (!pga[0])
1473                 return;
1474
1475 #ifdef CONFIG_LL_ENCRYPTION
1476         if (PageChecked(pga[0]->bp_page)) {
1477                 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1478                 if (!pa)
1479                         return;
1480         }
1481 #endif
1482
1483         for (i = 0; i < page_count; i++) {
1484                 /* Bounce pages used by osc_encrypt_pagecache_blocks()
1485                  * called from osc_brw_prep_request()
1486                  * are identified thanks to the PageChecked flag.
1487                  */
1488                 if (PageChecked(pga[i]->bp_page)) {
1489                         if (pa)
1490                                 pa[j++] = pga[i]->bp_page;
1491                         osc_finalize_bounce_page(&pga[i]->bp_page);
1492                 }
1493                 pga[i]->bp_count -= pga[i]->bp_count_diff;
1494                 pga[i]->bp_off += pga[i]->bp_off_diff;
1495         }
1496
1497         if (pa) {
1498                 sptlrpc_enc_pool_put_pages_array(pa, j);
1499                 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1500         }
1501 #endif
1502 }
1503
1504 static int
1505 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1506                      u32 page_count, struct brw_page **pga,
1507                      struct ptlrpc_request **reqp, int resend)
1508 {
1509         struct ptlrpc_request *req;
1510         struct ptlrpc_bulk_desc *desc;
1511         struct ost_body *body;
1512         struct obd_ioobj *ioobj;
1513         struct niobuf_remote *niobuf;
1514         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1515         struct osc_brw_async_args *aa;
1516         struct req_capsule *pill;
1517         struct brw_page *pg_prev;
1518         void *short_io_buf;
1519         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1520         struct inode *inode = NULL;
1521         bool directio = false;
1522         bool gpu = 0;
1523         bool enable_checksum = true;
1524         struct cl_page *clpage;
1525
1526         ENTRY;
1527         if (pga[0]->bp_page) {
1528                 clpage = oap2cl_page(brw_page2oap(pga[0]));
1529                 inode = clpage->cp_inode;
1530                 if (clpage->cp_type == CPT_TRANSIENT)
1531                         directio = true;
1532         }
1533         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1534                 RETURN(-ENOMEM); /* Recoverable */
1535         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1536                 RETURN(-EINVAL); /* Fatal */
1537
1538         if ((cmd & OBD_BRW_WRITE) != 0) {
1539                 opc = OST_WRITE;
1540                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1541                                                 osc_rq_pool,
1542                                                 &RQF_OST_BRW_WRITE);
1543         } else {
1544                 opc = OST_READ;
1545                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1546         }
1547         if (req == NULL)
1548                 RETURN(-ENOMEM);
1549
1550         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1551             llcrypt_has_encryption_key(inode)) {
1552                 struct page **pa = NULL;
1553
1554 #ifdef CONFIG_LL_ENCRYPTION
1555                 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1556                 if (pa == NULL) {
1557                         ptlrpc_request_free(req);
1558                         RETURN(-ENOMEM);
1559                 }
1560
1561                 rc = sptlrpc_enc_pool_get_pages_array(pa, page_count);
1562                 if (rc) {
1563                         CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
1564                                rc);
1565                         ptlrpc_request_free(req);
1566                         RETURN(rc);
1567                 }
1568 #endif
1569
1570                 for (i = 0; i < page_count; i++) {
1571                         struct brw_page *brwpg = pga[i];
1572                         struct page *data_page = NULL;
1573                         bool retried = false;
1574                         bool lockedbymyself;
1575                         u32 nunits =
1576                                 (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count;
1577                         struct address_space *map_orig = NULL;
1578                         pgoff_t index_orig;
1579
1580 retry_encrypt:
1581                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1582                         /* The page can already be locked when we arrive here.
1583                          * This is possible when cl_page_assume/vvp_page_assume
1584                          * is stuck on wait_on_page_writeback with page lock
1585                          * held. In this case there is no risk for the lock to
1586                          * be released while we are doing our encryption
1587                          * processing, because writeback against that page will
1588                          * end in vvp_page_completion_write/cl_page_completion,
1589                          * which means only once the page is fully processed.
1590                          */
1591                         lockedbymyself = trylock_page(brwpg->bp_page);
1592                         if (directio) {
1593                                 map_orig = brwpg->bp_page->mapping;
1594                                 brwpg->bp_page->mapping = inode->i_mapping;
1595                                 index_orig = brwpg->bp_page->index;
1596                                 clpage = oap2cl_page(brw_page2oap(brwpg));
1597                                 brwpg->bp_page->index = clpage->cp_page_index;
1598                         }
1599                         data_page =
1600                                 osc_encrypt_pagecache_blocks(brwpg->bp_page,
1601                                                             pa ? pa[i] : NULL,
1602                                                             nunits, 0,
1603                                                             GFP_NOFS);
1604                         if (directio) {
1605                                 brwpg->bp_page->mapping = map_orig;
1606                                 brwpg->bp_page->index = index_orig;
1607                         }
1608                         if (lockedbymyself)
1609                                 unlock_page(brwpg->bp_page);
1610                         if (IS_ERR(data_page)) {
1611                                 rc = PTR_ERR(data_page);
1612                                 if (rc == -ENOMEM && !retried) {
1613                                         retried = true;
1614                                         rc = 0;
1615                                         goto retry_encrypt;
1616                                 }
1617                                 if (pa) {
1618                                         sptlrpc_enc_pool_put_pages_array(pa + i,
1619                                                                 page_count - i);
1620                                         OBD_FREE_PTR_ARRAY_LARGE(pa,
1621                                                                  page_count);
1622                                 }
1623                                 ptlrpc_request_free(req);
1624                                 RETURN(rc);
1625                         }
1626                         /* Set PageChecked flag on bounce page for
1627                          * disambiguation in osc_release_bounce_pages().
1628                          */
1629                         SetPageChecked(data_page);
1630                         brwpg->bp_page = data_page;
1631                         /* there should be no gap in the middle of page array */
1632                         if (i == page_count - 1) {
1633                                 struct osc_async_page *oap =
1634                                         brw_page2oap(brwpg);
1635
1636                                 oa->o_size = oap->oap_count +
1637                                         oap->oap_obj_off + oap->oap_page_off;
1638                         }
1639                         /* len is forced to nunits, and relative offset to 0
1640                          * so store the old, clear text info
1641                          */
1642                         brwpg->bp_count_diff = nunits - brwpg->bp_count;
1643                         brwpg->bp_count = nunits;
1644                         brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK;
1645                         brwpg->bp_off = brwpg->bp_off & PAGE_MASK;
1646                 }
1647
1648                 if (pa)
1649                         OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1650         } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1651                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1652                 struct cl_page *clpage = oap2cl_page(oap);
1653                 struct cl_object *clobj = clpage->cp_obj;
1654                 struct cl_attr attr = { 0 };
1655                 struct lu_env *env;
1656                 __u16 refcheck;
1657
1658                 env = cl_env_get(&refcheck);
1659                 if (IS_ERR(env)) {
1660                         rc = PTR_ERR(env);
1661                         ptlrpc_request_free(req);
1662                         RETURN(rc);
1663                 }
1664
1665                 cl_object_attr_lock(clobj);
1666                 rc = cl_object_attr_get(env, clobj, &attr);
1667                 cl_object_attr_unlock(clobj);
1668                 cl_env_put(env, &refcheck);
1669                 if (rc != 0) {
1670                         ptlrpc_request_free(req);
1671                         RETURN(rc);
1672                 }
1673                 if (attr.cat_size)
1674                         oa->o_size = attr.cat_size;
1675         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1676                    llcrypt_has_encryption_key(inode)) {
1677                 for (i = 0; i < page_count; i++) {
1678                         struct brw_page *pg = pga[i];
1679                         u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count;
1680
1681                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1682                         /* count/off are forced to cover the whole encryption
1683                          * unit size so that all encrypted data is stored on the
1684                          * OST, so adjust bp_{count,off}_diff for the size of
1685                          * the clear text.
1686                          */
1687                         pg->bp_count_diff = nunits - pg->bp_count;
1688                         pg->bp_count = nunits;
1689                         pg->bp_off_diff = pg->bp_off & ~PAGE_MASK;
1690                         pg->bp_off = pg->bp_off & PAGE_MASK;
1691                 }
1692         }
1693
1694         for (niocount = i = 1; i < page_count; i++) {
1695                 if (!can_merge_pages(pga[i - 1], pga[i]))
1696                         niocount++;
1697         }
1698
1699         pill = &req->rq_pill;
1700         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1701                              sizeof(*ioobj));
1702         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1703                              niocount * sizeof(*niobuf));
1704
1705         for (i = 0; i < page_count; i++) {
1706                 short_io_size += pga[i]->bp_count;
1707                 if (!inode || !IS_ENCRYPTED(inode) ||
1708                     !llcrypt_has_encryption_key(inode)) {
1709                         pga[i]->bp_count_diff = 0;
1710                         pga[i]->bp_off_diff = 0;
1711                 }
1712         }
1713
1714         if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1715                 enable_checksum = false;
1716                 short_io_size = 0;
1717                 gpu = 1;
1718         }
1719
1720         /* Check if read/write is small enough to be a short io. */
1721         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1722             !imp_connect_shortio(cli->cl_import))
1723                 short_io_size = 0;
1724
1725         /* If this is an empty RPC to old server, just ignore it */
1726         if (!short_io_size && !pga[0]->bp_page) {
1727                 ptlrpc_request_free(req);
1728                 RETURN(-ENODATA);
1729         }
1730
1731         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1732                              opc == OST_READ ? 0 : short_io_size);
1733         if (opc == OST_READ)
1734                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1735                                      short_io_size);
1736
1737         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1738         if (rc) {
1739                 ptlrpc_request_free(req);
1740                 RETURN(rc);
1741         }
1742         osc_set_io_portal(req);
1743
1744         ptlrpc_at_set_req_timeout(req);
1745         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1746          * retry logic */
1747         req->rq_no_retry_einprogress = 1;
1748
1749         if (short_io_size != 0) {
1750                 desc = NULL;
1751                 short_io_buf = NULL;
1752                 goto no_bulk;
1753         }
1754
1755         desc = ptlrpc_prep_bulk_imp(req, page_count,
1756                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1757                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1758                         PTLRPC_BULK_PUT_SINK),
1759                 OST_BULK_PORTAL,
1760                 &ptlrpc_bulk_kiov_pin_ops);
1761
1762         if (desc == NULL)
1763                 GOTO(out, rc = -ENOMEM);
1764         /* NB request now owns desc and will free it when it gets freed */
1765         desc->bd_is_rdma = gpu;
1766 no_bulk:
1767         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1768         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1769         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1770         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1771
1772         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1773
1774         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1775          * and from_kgid(), because they are asynchronous. Fortunately, variable
1776          * oa contains valid o_uid and o_gid in these two operations.
1777          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1778          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1779          * other process logic */
1780         body->oa.o_uid = oa->o_uid;
1781         body->oa.o_gid = oa->o_gid;
1782
1783         obdo_to_ioobj(oa, ioobj);
1784         ioobj->ioo_bufcnt = niocount;
1785         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1786          * that might be send for this request.  The actual number is decided
1787          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1788          * "max - 1" for old client compatibility sending "0", and also so the
1789          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1790         if (desc != NULL)
1791                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1792         else /* short io */
1793                 ioobj_max_brw_set(ioobj, 0);
1794
1795         if (inode && IS_ENCRYPTED(inode) &&
1796             llcrypt_has_encryption_key(inode) &&
1797             !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) {
1798                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1799                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1800                         body->oa.o_flags = 0;
1801                 }
1802                 body->oa.o_flags |= LUSTRE_ENCRYPT_FL;
1803         }
1804
1805         if (short_io_size != 0) {
1806                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1807                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1808                         body->oa.o_flags = 0;
1809                 }
1810                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1811                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1812                        short_io_size);
1813                 if (opc == OST_WRITE) {
1814                         short_io_buf = req_capsule_client_get(pill,
1815                                                               &RMF_SHORT_IO);
1816                         LASSERT(short_io_buf != NULL);
1817                 }
1818         }
1819
1820         LASSERT(page_count > 0);
1821         pg_prev = pga[0];
1822         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1823                 struct brw_page *pg = pga[i];
1824                 int poff = pg->bp_off & ~PAGE_MASK;
1825
1826                 LASSERT(pg->bp_count > 0);
1827                 /* make sure there is no gap in the middle of page array */
1828                 LASSERTF(page_count == 1 ||
1829                          (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) &&
1830                           ergo(i > 0 && i < page_count - 1,
1831                                poff == 0 && pg->bp_count == PAGE_SIZE)   &&
1832                           ergo(i == page_count - 1, poff == 0)),
1833                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1834                          i, page_count, pg, pg->bp_off, pg->bp_count);
1835                 LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off,
1836                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1837                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1838                          i, page_count,
1839                          pg->bp_page, page_private(pg->bp_page), pg->bp_page->index, pg->bp_off,
1840                          pg_prev->bp_page, page_private(pg_prev->bp_page),
1841                          pg_prev->bp_page->index, pg_prev->bp_off);
1842                 LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) ==
1843                         (pg->bp_flag & OBD_BRW_SRVLOCK));
1844                 if (short_io_size != 0 && opc == OST_WRITE) {
1845                         unsigned char *ptr = kmap_atomic(pg->bp_page);
1846
1847                         LASSERT(short_io_size >= requested_nob + pg->bp_count);
1848                         memcpy(short_io_buf + requested_nob,
1849                                ptr + poff,
1850                                pg->bp_count);
1851                         kunmap_atomic(ptr);
1852                 } else if (short_io_size == 0) {
1853                         desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page, poff,
1854                                                          pg->bp_count);
1855                 }
1856                 requested_nob += pg->bp_count;
1857
1858                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1859                         niobuf--;
1860                         niobuf->rnb_len += pg->bp_count;
1861                 } else {
1862                         niobuf->rnb_offset = pg->bp_off;
1863                         niobuf->rnb_len    = pg->bp_count;
1864                         niobuf->rnb_flags  = pg->bp_flag;
1865                 }
1866                 pg_prev = pg;
1867         }
1868
1869         LASSERTF((void *)(niobuf - niocount) ==
1870                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1871                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1872                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1873
1874         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1875         if (resend) {
1876                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1877                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1878                         body->oa.o_flags = 0;
1879                 }
1880                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1881         }
1882
1883         if (osc_should_shrink_grant(cli))
1884                 osc_shrink_grant_local(cli, &body->oa);
1885
1886         if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1887                 enable_checksum = false;
1888
1889         /* size[REQ_REC_OFF] still sizeof (*body) */
1890         if (opc == OST_WRITE) {
1891                 if (enable_checksum) {
1892                         /* store cl_cksum_type in a local variable since
1893                          * it can be changed via lprocfs */
1894                         enum cksum_types cksum_type = cli->cl_cksum_type;
1895
1896                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1897                                 body->oa.o_flags = 0;
1898
1899                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1900                                                                 cksum_type);
1901                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1902
1903                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1904                                                   requested_nob, page_count,
1905                                                   pga, OST_WRITE,
1906                                                   &body->oa.o_cksum, resend);
1907                         if (rc < 0) {
1908                                 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1909                                        rc);
1910                                 GOTO(out, rc);
1911                         }
1912                         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1913                                "checksum at write origin: %x (%x)\n",
1914                                body->oa.o_cksum, cksum_type);
1915
1916                         /* save this in 'oa', too, for later checking */
1917                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1918                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1919                                                            cksum_type);
1920                 } else {
1921                         /* clear out the checksum flag, in case this is a
1922                          * resend but cl_checksum is no longer set. b=11238 */
1923                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1924                 }
1925                 oa->o_cksum = body->oa.o_cksum;
1926                 /* 1 RC per niobuf */
1927                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1928                                      sizeof(__u32) * niocount);
1929         } else {
1930                 if (enable_checksum) {
1931                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1932                                 body->oa.o_flags = 0;
1933                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1934                                 cli->cl_cksum_type);
1935                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1936                 }
1937
1938                 /* Client cksum has been already copied to wire obdo in previous
1939                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1940                  * resent due to cksum error, this will allow Server to
1941                  * check+dump pages on its side */
1942         }
1943         ptlrpc_request_set_replen(req);
1944
1945         aa = ptlrpc_req_async_args(aa, req);
1946         aa->aa_oa = oa;
1947         aa->aa_requested_nob = requested_nob;
1948         aa->aa_nio_count = niocount;
1949         aa->aa_page_count = page_count;
1950         aa->aa_resends = 0;
1951         aa->aa_ppga = pga;
1952         aa->aa_cli = cli;
1953         INIT_LIST_HEAD(&aa->aa_oaps);
1954
1955         *reqp = req;
1956         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1957         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1958                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1959                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1960         RETURN(0);
1961
1962  out:
1963         ptlrpc_req_finished(req);
1964         RETURN(rc);
1965 }
1966
1967 char dbgcksum_file_name[PATH_MAX];
1968
1969 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1970                                 struct brw_page **pga, __u32 server_cksum,
1971                                 __u32 client_cksum)
1972 {
1973         struct file *filp;
1974         int rc, i;
1975         unsigned int len;
1976         char *buf;
1977
1978         /* will only keep dump of pages on first error for the same range in
1979          * file/fid, not during the resends/retries. */
1980         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1981                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1982                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1983                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1984                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1985                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1986                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1987                  pga[0]->bp_off,
1988                  pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1,
1989                  client_cksum, server_cksum);
1990         CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1991         filp = filp_open(dbgcksum_file_name,
1992                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1993         if (IS_ERR(filp)) {
1994                 rc = PTR_ERR(filp);
1995                 if (rc == -EEXIST)
1996                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1997                                "checksum error: rc = %d\n", dbgcksum_file_name,
1998                                rc);
1999                 else
2000                         CERROR("%s: can't open to dump pages with checksum "
2001                                "error: rc = %d\n", dbgcksum_file_name, rc);
2002                 return;
2003         }
2004
2005         for (i = 0; i < page_count; i++) {
2006                 len = pga[i]->bp_count;
2007                 buf = kmap(pga[i]->bp_page);
2008                 while (len != 0) {
2009                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
2010                         if (rc < 0) {
2011                                 CERROR("%s: wanted to write %u but got %d "
2012                                        "error\n", dbgcksum_file_name, len, rc);
2013                                 break;
2014                         }
2015                         len -= rc;
2016                         buf += rc;
2017                 }
2018                 kunmap(pga[i]->bp_page);
2019         }
2020
2021         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
2022         if (rc)
2023                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
2024         filp_close(filp, NULL);
2025
2026         libcfs_debug_dumplog();
2027 }
2028
2029 static int
2030 check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
2031                      __u32 client_cksum, __u32 server_cksum,
2032                      struct osc_brw_async_args *aa)
2033 {
2034         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
2035         enum cksum_types cksum_type;
2036         obd_dif_csum_fn *fn = NULL;
2037         int sector_size = 0;
2038         __u32 new_cksum;
2039         char *msg;
2040         int rc;
2041
2042         if (server_cksum == client_cksum) {
2043                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2044                 return 0;
2045         }
2046
2047         if (aa->aa_cli->cl_checksum_dump)
2048                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
2049                                     server_cksum, client_cksum);
2050
2051         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
2052                                            oa->o_flags : 0);
2053
2054         switch (cksum_type) {
2055         case OBD_CKSUM_T10IP512:
2056                 fn = obd_dif_ip_fn;
2057                 sector_size = 512;
2058                 break;
2059         case OBD_CKSUM_T10IP4K:
2060                 fn = obd_dif_ip_fn;
2061                 sector_size = 4096;
2062                 break;
2063         case OBD_CKSUM_T10CRC512:
2064                 fn = obd_dif_crc_fn;
2065                 sector_size = 512;
2066                 break;
2067         case OBD_CKSUM_T10CRC4K:
2068                 fn = obd_dif_crc_fn;
2069                 sector_size = 4096;
2070                 break;
2071         default:
2072                 break;
2073         }
2074
2075         if (fn)
2076                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
2077                                              aa->aa_page_count, aa->aa_ppga,
2078                                              OST_WRITE, fn, sector_size,
2079                                              &new_cksum, true);
2080         else
2081                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
2082                                        aa->aa_ppga, OST_WRITE, cksum_type,
2083                                        &new_cksum);
2084
2085         if (rc < 0)
2086                 msg = "failed to calculate the client write checksum";
2087         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
2088                 msg = "the server did not use the checksum type specified in "
2089                       "the original request - likely a protocol problem";
2090         else if (new_cksum == server_cksum)
2091                 msg = "changed on the client after we checksummed it - "
2092                       "likely false positive due to mmap IO (bug 11742)";
2093         else if (new_cksum == client_cksum)
2094                 msg = "changed in transit before arrival at OST";
2095         else
2096                 msg = "changed in transit AND doesn't match the original - "
2097                       "likely false positive due to mmap IO (bug 11742)";
2098
2099         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
2100                            DFID " object "DOSTID" extent [%llu-%llu], original "
2101                            "client csum %x (type %x), server csum %x (type %x),"
2102                            " client csum now %x\n",
2103                            obd_name, msg, libcfs_nidstr(&peer->nid),
2104                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
2105                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
2106                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
2107                            POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off,
2108                            aa->aa_ppga[aa->aa_page_count - 1]->bp_off +
2109                                 aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1,
2110                            client_cksum,
2111                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
2112                            server_cksum, cksum_type, new_cksum);
2113         return 1;
2114 }
2115
2116 /* Note rc enters this function as number of bytes transferred */
2117 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
2118 {
2119         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
2120         struct client_obd *cli = aa->aa_cli;
2121         const char *obd_name = cli->cl_import->imp_obd->obd_name;
2122         const struct lnet_processid *peer =
2123                 &req->rq_import->imp_connection->c_peer;
2124         struct ost_body *body;
2125         u32 client_cksum = 0;
2126         struct inode *inode = NULL;
2127         unsigned int blockbits = 0, blocksize = 0;
2128         struct cl_page *clpage;
2129
2130         ENTRY;
2131
2132         if (rc < 0 && rc != -EDQUOT) {
2133                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
2134                 RETURN(rc);
2135         }
2136
2137         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
2138         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
2139         if (body == NULL) {
2140                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
2141                 RETURN(-EPROTO);
2142         }
2143
2144         /* set/clear over quota flag for a uid/gid/projid */
2145         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
2146             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
2147                 unsigned qid[LL_MAXQUOTAS] = {
2148                                          body->oa.o_uid, body->oa.o_gid,
2149                                          body->oa.o_projid };
2150                 CDEBUG(D_QUOTA,
2151                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
2152                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
2153                        body->oa.o_valid, body->oa.o_flags);
2154                 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2155                                 body->oa.o_flags);
2156         }
2157
2158         osc_update_grant(cli, body);
2159
2160         if (rc < 0)
2161                 RETURN(rc);
2162
2163         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2164                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2165
2166         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2167                 if (rc > 0) {
2168                         CERROR("%s: unexpected positive size %d\n",
2169                                obd_name, rc);
2170                         RETURN(-EPROTO);
2171                 }
2172
2173                 if (req->rq_bulk != NULL &&
2174                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2175                         RETURN(-EAGAIN);
2176
2177                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2178                     check_write_checksum(&body->oa, peer, client_cksum,
2179                                          body->oa.o_cksum, aa))
2180                         RETURN(-EAGAIN);
2181
2182                 rc = check_write_rcs(req, aa->aa_requested_nob,
2183                                      aa->aa_nio_count, aa->aa_page_count,
2184                                      aa->aa_ppga);
2185                 GOTO(out, rc);
2186         }
2187
2188         /* The rest of this function executes only for OST_READs */
2189
2190         if (req->rq_bulk == NULL) {
2191                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2192                                           RCL_SERVER);
2193                 LASSERT(rc == req->rq_status);
2194         } else {
2195                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2196                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2197         }
2198         if (rc < 0)
2199                 GOTO(out, rc = -EAGAIN);
2200
2201         if (rc > aa->aa_requested_nob) {
2202                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2203                        rc, aa->aa_requested_nob);
2204                 RETURN(-EPROTO);
2205         }
2206
2207         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2208                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2209                        rc, req->rq_bulk->bd_nob_transferred);
2210                 RETURN(-EPROTO);
2211         }
2212
2213         if (req->rq_bulk == NULL) {
2214                 /* short io */
2215                 int nob, pg_count, i = 0;
2216                 unsigned char *buf;
2217
2218                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2219                 pg_count = aa->aa_page_count;
2220                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2221                                                    rc);
2222                 nob = rc;
2223                 while (nob > 0 && pg_count > 0) {
2224                         unsigned char *ptr;
2225                         int count = aa->aa_ppga[i]->bp_count > nob ?
2226                                     nob : aa->aa_ppga[i]->bp_count;
2227
2228                         CDEBUG(D_CACHE, "page %p count %d\n",
2229                                aa->aa_ppga[i]->bp_page, count);
2230                         ptr = kmap_atomic(aa->aa_ppga[i]->bp_page);
2231                         memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf,
2232                                count);
2233                         kunmap_atomic((void *) ptr);
2234
2235                         buf += count;
2236                         nob -= count;
2237                         i++;
2238                         pg_count--;
2239                 }
2240         }
2241
2242         if (rc < aa->aa_requested_nob)
2243                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2244
2245         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2246                 static int cksum_counter;
2247                 u32 server_cksum = body->oa.o_cksum;
2248                 int nob = rc;
2249                 char *via = "";
2250                 char *router = "";
2251                 enum cksum_types cksum_type;
2252                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2253                         body->oa.o_flags : 0;
2254
2255                 cksum_type = obd_cksum_type_unpack(o_flags);
2256                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2257                                           aa->aa_page_count, aa->aa_ppga,
2258                                           OST_READ, &client_cksum, false);
2259                 if (rc < 0)
2260                         GOTO(out, rc);
2261
2262                 if (req->rq_bulk != NULL &&
2263                     !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
2264                         via = " via ";
2265                         router = libcfs_nidstr(&req->rq_bulk->bd_sender);
2266                 }
2267
2268                 if (server_cksum != client_cksum) {
2269                         struct ost_body *clbody;
2270                         __u32 client_cksum2;
2271                         u32 page_count = aa->aa_page_count;
2272
2273                         osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2274                                              page_count, aa->aa_ppga,
2275                                              OST_READ, &client_cksum2, true);
2276                         clbody = req_capsule_client_get(&req->rq_pill,
2277                                                         &RMF_OST_BODY);
2278                         if (cli->cl_checksum_dump)
2279                                 dump_all_bulk_pages(&clbody->oa, page_count,
2280                                                     aa->aa_ppga, server_cksum,
2281                                                     client_cksum);
2282
2283                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2284                                            "%s%s%s inode "DFID" object "DOSTID
2285                                            " extent [%llu-%llu], client %x/%x, "
2286                                            "server %x, cksum_type %x\n",
2287                                            obd_name,
2288                                            libcfs_nidstr(&peer->nid),
2289                                            via, router,
2290                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2291                                                 clbody->oa.o_parent_seq : 0ULL,
2292                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2293                                                 clbody->oa.o_parent_oid : 0,
2294                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2295                                                 clbody->oa.o_parent_ver : 0,
2296                                            POSTID(&body->oa.o_oi),
2297                                            aa->aa_ppga[0]->bp_off,
2298                                            aa->aa_ppga[page_count-1]->bp_off +
2299                                            aa->aa_ppga[page_count-1]->bp_count - 1,
2300                                            client_cksum, client_cksum2,
2301                                            server_cksum, cksum_type);
2302                         cksum_counter = 0;
2303                         aa->aa_oa->o_cksum = client_cksum;
2304                         rc = -EAGAIN;
2305                 } else {
2306                         cksum_counter++;
2307                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2308                         rc = 0;
2309                 }
2310         } else if (unlikely(client_cksum)) {
2311                 static int cksum_missed;
2312
2313                 cksum_missed++;
2314                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2315                         CERROR("%s: checksum %u requested from %s but not sent\n",
2316                                obd_name, cksum_missed,
2317                                libcfs_nidstr(&peer->nid));
2318         } else {
2319                 rc = 0;
2320         }
2321
2322         /* get the inode from the first cl_page */
2323         clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
2324         inode = clpage->cp_inode;
2325         if (clpage->cp_type == CPT_TRANSIENT && inode) {
2326                 blockbits = inode->i_blkbits;
2327                 blocksize = 1 << blockbits;
2328         }
2329         if (inode && IS_ENCRYPTED(inode)) {
2330                 int idx;
2331
2332                 if (!llcrypt_has_encryption_key(inode)) {
2333                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2334                         GOTO(out, rc);
2335                 }
2336                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2337                         struct brw_page *brwpg = aa->aa_ppga[idx];
2338                         unsigned int offs = 0;
2339
2340                         while (offs < PAGE_SIZE) {
2341                                 /* do not decrypt if page is all 0s */
2342                                 if (memchr_inv(page_address(brwpg->bp_page) + offs,
2343                                       0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2344                                         /* if page is empty forward info to
2345                                          * upper layers (ll_io_zero_page) by
2346                                          * clearing PagePrivate2
2347                                          */
2348                                         if (!offs)
2349                                                 ClearPagePrivate2(brwpg->bp_page);
2350                                         break;
2351                                 }
2352
2353                                 if (blockbits) {
2354                                         /* This is direct IO case. Directly call
2355                                          * decrypt function that takes inode as
2356                                          * input parameter. Page does not need
2357                                          * to be locked.
2358                                          */
2359                                         u64 lblk_num;
2360                                         unsigned int i;
2361
2362                                         clpage =
2363                                                oap2cl_page(brw_page2oap(brwpg));
2364                                         lblk_num =
2365                                                 ((u64)(clpage->cp_page_index) <<
2366                                                 (PAGE_SHIFT - blockbits)) +
2367                                                 (offs >> blockbits);
2368                                         for (i = offs;
2369                                              i < offs +
2370                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2371                                              i += blocksize, lblk_num++) {
2372                                                 rc =
2373                                                   llcrypt_decrypt_block_inplace(
2374                                                           inode, brwpg->bp_page,
2375                                                           blocksize, i,
2376                                                           lblk_num);
2377                                                 if (rc)
2378                                                         break;
2379                                         }
2380                                 } else {
2381                                         rc = llcrypt_decrypt_pagecache_blocks(
2382                                                 brwpg->bp_page,
2383                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2384                                                 offs);
2385                                 }
2386                                 if (rc)
2387                                         GOTO(out, rc);
2388
2389                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2390                         }
2391                 }
2392         }
2393
2394 out:
2395         if (rc >= 0)
2396                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2397                                      aa->aa_oa, &body->oa);
2398
2399         RETURN(rc);
2400 }
2401
2402 static int osc_brw_redo_request(struct ptlrpc_request *request,
2403                                 struct osc_brw_async_args *aa, int rc)
2404 {
2405         struct ptlrpc_request *new_req;
2406         struct osc_brw_async_args *new_aa;
2407         struct osc_async_page *oap;
2408         ENTRY;
2409
2410         /* The below message is checked in replay-ost-single.sh test_8ae*/
2411         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2412                   "redo for recoverable error %d", rc);
2413
2414         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2415                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2416                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2417                                   aa->aa_ppga, &new_req, 1);
2418         if (rc)
2419                 RETURN(rc);
2420
2421         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2422                 if (oap->oap_request != NULL) {
2423                         LASSERTF(request == oap->oap_request,
2424                                  "request %p != oap_request %p\n",
2425                                  request, oap->oap_request);
2426                 }
2427         }
2428         /*
2429          * New request takes over pga and oaps from old request.
2430          * Note that copying a list_head doesn't work, need to move it...
2431          */
2432         aa->aa_resends++;
2433         new_req->rq_interpret_reply = request->rq_interpret_reply;
2434         new_req->rq_async_args = request->rq_async_args;
2435         new_req->rq_commit_cb = request->rq_commit_cb;
2436         /* cap resend delay to the current request timeout, this is similar to
2437          * what ptlrpc does (see after_reply()) */
2438         if (aa->aa_resends > new_req->rq_timeout)
2439                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2440         else
2441                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2442         new_req->rq_generation_set = 1;
2443         new_req->rq_import_generation = request->rq_import_generation;
2444
2445         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2446
2447         INIT_LIST_HEAD(&new_aa->aa_oaps);
2448         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2449         INIT_LIST_HEAD(&new_aa->aa_exts);
2450         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2451         new_aa->aa_resends = aa->aa_resends;
2452
2453         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2454                 if (oap->oap_request) {
2455                         ptlrpc_req_finished(oap->oap_request);
2456                         oap->oap_request = ptlrpc_request_addref(new_req);
2457                 }
2458         }
2459
2460         /* XXX: This code will run into problem if we're going to support
2461          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2462          * and wait for all of them to be finished. We should inherit request
2463          * set from old request. */
2464         ptlrpcd_add_req(new_req);
2465
2466         DEBUG_REQ(D_INFO, new_req, "new request");
2467         RETURN(0);
2468 }
2469
2470 /*
2471  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2472  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2473  * fine for our small page arrays and doesn't require allocation.  its an
2474  * insertion sort that swaps elements that are strides apart, shrinking the
2475  * stride down until its '1' and the array is sorted.
2476  */
2477 static void sort_brw_pages(struct brw_page **array, int num)
2478 {
2479         int stride, i, j;
2480         struct brw_page *tmp;
2481
2482         if (num == 1)
2483                 return;
2484         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2485                 ;
2486
2487         do {
2488                 stride /= 3;
2489                 for (i = stride ; i < num ; i++) {
2490                         tmp = array[i];
2491                         j = i;
2492                         while (j >= stride && array[j - stride]->bp_off > tmp->bp_off) {
2493                                 array[j] = array[j - stride];
2494                                 j -= stride;
2495                         }
2496                         array[j] = tmp;
2497                 }
2498         } while (stride > 1);
2499 }
2500
2501 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2502 {
2503         LASSERT(ppga != NULL);
2504         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2505 }
2506
2507 static int brw_interpret(const struct lu_env *env,
2508                          struct ptlrpc_request *req, void *args, int rc)
2509 {
2510         struct osc_brw_async_args *aa = args;
2511         struct osc_extent *ext;
2512         struct osc_extent *tmp;
2513         struct client_obd *cli = aa->aa_cli;
2514         unsigned long transferred = 0;
2515         struct cl_object *obj = NULL;
2516
2517         ENTRY;
2518
2519         rc = osc_brw_fini_request(req, rc);
2520         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2521
2522         /* restore clear text pages */
2523         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2524
2525         /*
2526          * When server returns -EINPROGRESS, client should always retry
2527          * regardless of the number of times the bulk was resent already.
2528          */
2529         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2530                 if (req->rq_import_generation !=
2531                     req->rq_import->imp_generation) {
2532                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2533                                ""DOSTID", rc = %d.\n",
2534                                req->rq_import->imp_obd->obd_name,
2535                                POSTID(&aa->aa_oa->o_oi), rc);
2536                 } else if (rc == -EINPROGRESS ||
2537                            client_should_resend(aa->aa_resends, aa->aa_cli)) {
2538                         rc = osc_brw_redo_request(req, aa, rc);
2539                 } else {
2540                         CERROR("%s: too many resent retries for object: "
2541                                "%llu:%llu, rc = %d.\n",
2542                                req->rq_import->imp_obd->obd_name,
2543                                POSTID(&aa->aa_oa->o_oi), rc);
2544                 }
2545
2546                 if (rc == 0)
2547                         RETURN(0);
2548                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2549                         rc = -EIO;
2550         }
2551
2552         if (rc == 0) {
2553                 struct obdo *oa = aa->aa_oa;
2554                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2555                 unsigned long valid = 0;
2556                 struct osc_async_page *last;
2557
2558                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2559                 obj = osc2cl(last->oap_obj);
2560
2561                 cl_object_attr_lock(obj);
2562                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2563                         attr->cat_blocks = oa->o_blocks;
2564                         valid |= CAT_BLOCKS;
2565                 }
2566                 if (oa->o_valid & OBD_MD_FLMTIME) {
2567                         attr->cat_mtime = oa->o_mtime;
2568                         valid |= CAT_MTIME;
2569                 }
2570                 if (oa->o_valid & OBD_MD_FLATIME) {
2571                         attr->cat_atime = oa->o_atime;
2572                         valid |= CAT_ATIME;
2573                 }
2574                 if (oa->o_valid & OBD_MD_FLCTIME) {
2575                         attr->cat_ctime = oa->o_ctime;
2576                         valid |= CAT_CTIME;
2577                 }
2578
2579                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2580                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2581                         loff_t last_off = last->oap_count + last->oap_obj_off +
2582                                 last->oap_page_off;
2583
2584                         /* Change file size if this is an out of quota or
2585                          * direct IO write and it extends the file size */
2586                         if (loi->loi_lvb.lvb_size < last_off) {
2587                                 attr->cat_size = last_off;
2588                                 valid |= CAT_SIZE;
2589                         }
2590                         /* Extend KMS if it's not a lockless write */
2591                         if (loi->loi_kms < last_off &&
2592                             oap2osc_page(last)->ops_srvlock == 0) {
2593                                 attr->cat_kms = last_off;
2594                                 valid |= CAT_KMS;
2595                         }
2596                 }
2597
2598                 if (valid != 0)
2599                         cl_object_attr_update(env, obj, attr, valid);
2600                 cl_object_attr_unlock(obj);
2601         }
2602         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2603         aa->aa_oa = NULL;
2604
2605         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) {
2606                 osc_inc_unstable_pages(req);
2607                 /*
2608                  * If req->rq_committed is set, it means that the dirty pages
2609                  * have already committed into the stable storage on OSTs
2610                  * (i.e. Direct I/O).
2611                  */
2612                 if (!req->rq_committed)
2613                         cl_object_dirty_for_sync(env, cl_object_top(obj));
2614         }
2615
2616         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2617                 list_del_init(&ext->oe_link);
2618                 osc_extent_finish(env, ext, 1,
2619                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2620         }
2621         LASSERT(list_empty(&aa->aa_exts));
2622         LASSERT(list_empty(&aa->aa_oaps));
2623
2624         transferred = (req->rq_bulk == NULL ? /* short io */
2625                        aa->aa_requested_nob :
2626                        req->rq_bulk->bd_nob_transferred);
2627
2628         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2629         ptlrpc_lprocfs_brw(req, transferred);
2630
2631         spin_lock(&cli->cl_loi_list_lock);
2632         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2633          * is called so we know whether to go to sync BRWs or wait for more
2634          * RPCs to complete */
2635         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2636                 cli->cl_w_in_flight--;
2637         else
2638                 cli->cl_r_in_flight--;
2639         osc_wake_cache_waiters(cli);
2640         spin_unlock(&cli->cl_loi_list_lock);
2641
2642         osc_io_unplug(env, cli, NULL);
2643         RETURN(rc);
2644 }
2645
2646 static void brw_commit(struct ptlrpc_request *req)
2647 {
2648         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2649          * this called via the rq_commit_cb, I need to ensure
2650          * osc_dec_unstable_pages is still called. Otherwise unstable
2651          * pages may be leaked. */
2652         spin_lock(&req->rq_lock);
2653         if (likely(req->rq_unstable)) {
2654                 req->rq_unstable = 0;
2655                 spin_unlock(&req->rq_lock);
2656
2657                 osc_dec_unstable_pages(req);
2658         } else {
2659                 req->rq_committed = 1;
2660                 spin_unlock(&req->rq_lock);
2661         }
2662 }
2663
2664 /**
2665  * Build an RPC by the list of extent @ext_list. The caller must ensure
2666  * that the total pages in this list are NOT over max pages per RPC.
2667  * Extents in the list must be in OES_RPC state.
2668  */
2669 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2670                   struct list_head *ext_list, int cmd)
2671 {
2672         struct ptlrpc_request           *req = NULL;
2673         struct osc_extent               *ext;
2674         struct brw_page                 **pga = NULL;
2675         struct osc_brw_async_args       *aa = NULL;
2676         struct obdo                     *oa = NULL;
2677         struct osc_async_page           *oap;
2678         struct osc_object               *obj = NULL;
2679         struct cl_req_attr              *crattr = NULL;
2680         loff_t                          starting_offset = OBD_OBJECT_EOF;
2681         loff_t                          ending_offset = 0;
2682         /* '1' for consistency with code that checks !mpflag to restore */
2683         int mpflag = 1;
2684         int                             mem_tight = 0;
2685         int                             page_count = 0;
2686         bool                            soft_sync = false;
2687         bool                            ndelay = false;
2688         int                             i;
2689         int                             grant = 0;
2690         int                             rc;
2691         __u32                           layout_version = 0;
2692         LIST_HEAD(rpc_list);
2693         struct ost_body                 *body;
2694         ENTRY;
2695         LASSERT(!list_empty(ext_list));
2696
2697         /* add pages into rpc_list to build BRW rpc */
2698         list_for_each_entry(ext, ext_list, oe_link) {
2699                 LASSERT(ext->oe_state == OES_RPC);
2700                 mem_tight |= ext->oe_memalloc;
2701                 grant += ext->oe_grants;
2702                 page_count += ext->oe_nr_pages;
2703                 layout_version = max(layout_version, ext->oe_layout_version);
2704                 if (obj == NULL)
2705                         obj = ext->oe_obj;
2706         }
2707
2708         soft_sync = osc_over_unstable_soft_limit(cli);
2709         if (mem_tight)
2710                 mpflag = memalloc_noreclaim_save();
2711
2712         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2713         if (pga == NULL)
2714                 GOTO(out, rc = -ENOMEM);
2715
2716         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2717         if (oa == NULL)
2718                 GOTO(out, rc = -ENOMEM);
2719
2720         i = 0;
2721         list_for_each_entry(ext, ext_list, oe_link) {
2722                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2723                         if (mem_tight)
2724                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2725                         if (soft_sync)
2726                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2727                         pga[i] = &oap->oap_brw_page;
2728                         pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off;
2729                         i++;
2730
2731                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2732                         if (starting_offset == OBD_OBJECT_EOF ||
2733                             starting_offset > oap->oap_obj_off) {
2734                                 starting_offset = oap->oap_obj_off;
2735                         } else {
2736                                 CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n",
2737                                        i, oap->oap_obj_off, oap->oap_page_off);
2738                                 LASSERT(oap->oap_page_off == 0);
2739                         }
2740                         if (ending_offset < oap->oap_obj_off + oap->oap_count) {
2741                                 ending_offset = oap->oap_obj_off +
2742                                                 oap->oap_count;
2743                         } else {
2744                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2745                                         PAGE_SIZE);
2746                         }
2747                 }
2748                 if (ext->oe_ndelay)
2749                         ndelay = true;
2750         }
2751
2752         /* first page in the list */
2753         oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2754
2755         crattr = &osc_env_info(env)->oti_req_attr;
2756         memset(crattr, 0, sizeof(*crattr));
2757         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2758         crattr->cra_flags = ~0ULL;
2759         crattr->cra_page = oap2cl_page(oap);
2760         crattr->cra_oa = oa;
2761         cl_req_attr_set(env, osc2cl(obj), crattr);
2762
2763         if (cmd == OBD_BRW_WRITE) {
2764                 oa->o_grant_used = grant;
2765                 if (layout_version > 0) {
2766                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2767                                PFID(&oa->o_oi.oi_fid), layout_version);
2768
2769                         oa->o_layout_version = layout_version;
2770                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2771                 }
2772         }
2773
2774         sort_brw_pages(pga, page_count);
2775         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2776         if (rc != 0) {
2777                 CERROR("prep_req failed: %d\n", rc);
2778                 GOTO(out, rc);
2779         }
2780
2781         req->rq_commit_cb = brw_commit;
2782         req->rq_interpret_reply = brw_interpret;
2783         req->rq_memalloc = mem_tight != 0;
2784         oap->oap_request = ptlrpc_request_addref(req);
2785         if (ndelay) {
2786                 req->rq_no_resend = req->rq_no_delay = 1;
2787                 /* probably set a shorter timeout value.
2788                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2789                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2790         }
2791
2792         /* Need to update the timestamps after the request is built in case
2793          * we race with setattr (locally or in queue at OST).  If OST gets
2794          * later setattr before earlier BRW (as determined by the request xid),
2795          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2796          * way to do this in a single call.  bug 10150 */
2797         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2798         crattr->cra_oa = &body->oa;
2799         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2800         cl_req_attr_set(env, osc2cl(obj), crattr);
2801         lustre_msg_set_uid_gid(req->rq_reqmsg, &crattr->cra_uid,
2802                                &crattr->cra_gid);
2803         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2804
2805         aa = ptlrpc_req_async_args(aa, req);
2806         INIT_LIST_HEAD(&aa->aa_oaps);
2807         list_splice_init(&rpc_list, &aa->aa_oaps);
2808         INIT_LIST_HEAD(&aa->aa_exts);
2809         list_splice_init(ext_list, &aa->aa_exts);
2810
2811         spin_lock(&cli->cl_loi_list_lock);
2812         starting_offset >>= PAGE_SHIFT;
2813         ending_offset >>= PAGE_SHIFT;
2814         if (cmd == OBD_BRW_READ) {
2815                 cli->cl_r_in_flight++;
2816                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2817                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2818                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2819                                       starting_offset + 1);
2820         } else {
2821                 cli->cl_w_in_flight++;
2822                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2823                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2824                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2825                                       starting_offset + 1);
2826         }
2827         spin_unlock(&cli->cl_loi_list_lock);
2828
2829         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2830                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2831         if (libcfs_debug & D_IOTRACE) {
2832                 struct lu_fid fid;
2833
2834                 fid.f_seq = crattr->cra_oa->o_parent_seq;
2835                 fid.f_oid = crattr->cra_oa->o_parent_oid;
2836                 fid.f_ver = crattr->cra_oa->o_parent_ver;
2837                 CDEBUG(D_IOTRACE,
2838                        DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
2839                        PFID(&fid), page_count,
2840                        cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
2841                        ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
2842         }
2843         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2844
2845         ptlrpcd_add_req(req);
2846         rc = 0;
2847         EXIT;
2848
2849 out:
2850         if (mem_tight)
2851                 memalloc_noreclaim_restore(mpflag);
2852
2853         if (rc != 0) {
2854                 LASSERT(req == NULL);
2855
2856                 if (oa)
2857                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2858                 if (pga) {
2859                         osc_release_bounce_pages(pga, page_count);
2860                         osc_release_ppga(pga, page_count);
2861                 }
2862                 /* this should happen rarely and is pretty bad, it makes the
2863                  * pending list not follow the dirty order
2864                  */
2865                 while ((ext = list_first_entry_or_null(ext_list,
2866                                                        struct osc_extent,
2867                                                        oe_link)) != NULL) {
2868                         list_del_init(&ext->oe_link);
2869                         osc_extent_finish(env, ext, 0, rc);
2870                 }
2871         }
2872         RETURN(rc);
2873 }
2874
2875 /* This is to refresh our lock in face of no RPCs. */
2876 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2877 {
2878         struct ptlrpc_request *req;
2879         struct obdo oa;
2880         struct brw_page bpg = { .bp_off = start, .bp_count = 1};
2881         struct brw_page *pga = &bpg;
2882         int rc;
2883
2884         memset(&oa, 0, sizeof(oa));
2885         oa.o_oi = osc->oo_oinfo->loi_oi;
2886         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2887         /* For updated servers - don't do a read */
2888         oa.o_flags = OBD_FL_NORPC;
2889
2890         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2891                                   &req, 0);
2892
2893         /* If we succeeded we ship it off, if not there's no point in doing
2894          * anything. Also no resends.
2895          * No interpret callback, no commit callback.
2896          */
2897         if (!rc) {
2898                 req->rq_no_resend = 1;
2899                 ptlrpcd_add_req(req);
2900         }
2901 }
2902
2903 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2904 {
2905         int set = 0;
2906
2907         LASSERT(lock != NULL);
2908
2909         lock_res_and_lock(lock);
2910
2911         if (lock->l_ast_data == NULL)
2912                 lock->l_ast_data = data;
2913         if (lock->l_ast_data == data)
2914                 set = 1;
2915
2916         unlock_res_and_lock(lock);
2917
2918         return set;
2919 }
2920
2921 static int osc_enqueue_fini(struct ptlrpc_request *req,
2922                             osc_enqueue_upcall_f upcall,
2923                             void *cookie, struct lustre_handle *lockh,
2924                             enum ldlm_mode mode, __u64 *flags,
2925                             bool speculative, int errcode)
2926 {
2927         bool intent = *flags & LDLM_FL_HAS_INTENT;
2928         int rc;
2929         ENTRY;
2930
2931         /* The request was created before ldlm_cli_enqueue call. */
2932         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2933                 struct ldlm_reply *rep;
2934
2935                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2936                 LASSERT(rep != NULL);
2937
2938                 rep->lock_policy_res1 =
2939                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2940                 if (rep->lock_policy_res1)
2941                         errcode = rep->lock_policy_res1;
2942                 if (!speculative)
2943                         *flags |= LDLM_FL_LVB_READY;
2944         } else if (errcode == ELDLM_OK) {
2945                 *flags |= LDLM_FL_LVB_READY;
2946         }
2947
2948         /* Call the update callback. */
2949         rc = (*upcall)(cookie, lockh, errcode);
2950
2951         /* release the reference taken in ldlm_cli_enqueue() */
2952         if (errcode == ELDLM_LOCK_MATCHED)
2953                 errcode = ELDLM_OK;
2954         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2955                 ldlm_lock_decref(lockh, mode);
2956
2957         RETURN(rc);
2958 }
2959
2960 static int osc_enqueue_interpret(const struct lu_env *env,
2961                                  struct ptlrpc_request *req,
2962                                  void *args, int rc)
2963 {
2964         struct osc_enqueue_args *aa = args;
2965         struct ldlm_lock *lock;
2966         struct lustre_handle *lockh = &aa->oa_lockh;
2967         enum ldlm_mode mode = aa->oa_mode;
2968         struct ost_lvb *lvb = aa->oa_lvb;
2969         __u32 lvb_len = sizeof(*lvb);
2970         __u64 flags = 0;
2971         struct ldlm_enqueue_info einfo = {
2972                 .ei_type = aa->oa_type,
2973                 .ei_mode = mode,
2974         };
2975
2976         ENTRY;
2977
2978         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2979          * be valid. */
2980         lock = ldlm_handle2lock(lockh);
2981         LASSERTF(lock != NULL,
2982                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2983                  lockh->cookie, req, aa);
2984
2985         /* Take an additional reference so that a blocking AST that
2986          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2987          * to arrive after an upcall has been executed by
2988          * osc_enqueue_fini(). */
2989         ldlm_lock_addref(lockh, mode);
2990
2991         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2992         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2993
2994         /* Let CP AST to grant the lock first. */
2995         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2996
2997         if (aa->oa_speculative) {
2998                 LASSERT(aa->oa_lvb == NULL);
2999                 LASSERT(aa->oa_flags == NULL);
3000                 aa->oa_flags = &flags;
3001         }
3002
3003         /* Complete obtaining the lock procedure. */
3004         rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
3005                                    aa->oa_flags, lvb, lvb_len, lockh, rc,
3006                                    false);
3007         /* Complete osc stuff. */
3008         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
3009                               aa->oa_flags, aa->oa_speculative, rc);
3010
3011         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3012
3013         ldlm_lock_decref(lockh, mode);
3014         LDLM_LOCK_PUT(lock);
3015         RETURN(rc);
3016 }
3017
3018 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3019  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3020  * other synchronous requests, however keeping some locks and trying to obtain
3021  * others may take a considerable amount of time in a case of ost failure; and
3022  * when other sync requests do not get released lock from a client, the client
3023  * is evicted from the cluster -- such scenarious make the life difficult, so
3024  * release locks just after they are obtained. */
3025 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3026                      __u64 *flags, union ldlm_policy_data *policy,
3027                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
3028                      void *cookie, struct ldlm_enqueue_info *einfo,
3029                      struct ptlrpc_request_set *rqset, int async,
3030                      bool speculative)
3031 {
3032         struct obd_device *obd = exp->exp_obd;
3033         struct lustre_handle lockh = { 0 };
3034         struct ptlrpc_request *req = NULL;
3035         int intent = *flags & LDLM_FL_HAS_INTENT;
3036         __u64 search_flags = *flags;
3037         __u64 match_flags = 0;
3038         enum ldlm_mode mode;
3039         int rc;
3040         ENTRY;
3041
3042         /* Filesystem lock extents are extended to page boundaries so that
3043          * dealing with the page cache is a little smoother.  */
3044         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3045         policy->l_extent.end |= ~PAGE_MASK;
3046
3047         /* Next, search for already existing extent locks that will cover us */
3048         /* If we're trying to read, we also search for an existing PW lock.  The
3049          * VFS and page cache already protect us locally, so lots of readers/
3050          * writers can share a single PW lock.
3051          *
3052          * There are problems with conversion deadlocks, so instead of
3053          * converting a read lock to a write lock, we'll just enqueue a new
3054          * one.
3055          *
3056          * At some point we should cancel the read lock instead of making them
3057          * send us a blocking callback, but there are problems with canceling
3058          * locks out from other users right now, too. */
3059         mode = einfo->ei_mode;
3060         if (einfo->ei_mode == LCK_PR)
3061                 mode |= LCK_PW;
3062         /* Normal lock requests must wait for the LVB to be ready before
3063          * matching a lock; speculative lock requests do not need to,
3064          * because they will not actually use the lock. */
3065         if (!speculative)
3066                 search_flags |= LDLM_FL_LVB_READY;
3067         if (intent != 0)
3068                 search_flags |= LDLM_FL_BLOCK_GRANTED;
3069         if (mode == LCK_GROUP)
3070                 match_flags = LDLM_MATCH_GROUP;
3071         mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
3072                                          res_id, einfo->ei_type, policy, mode,
3073                                          &lockh, match_flags);
3074         if (mode) {
3075                 struct ldlm_lock *matched;
3076
3077                 if (*flags & LDLM_FL_TEST_LOCK)
3078                         RETURN(ELDLM_OK);
3079
3080                 matched = ldlm_handle2lock(&lockh);
3081                 if (speculative) {
3082                         /* This DLM lock request is speculative, and does not
3083                          * have an associated IO request. Therefore if there
3084                          * is already a DLM lock, it wll just inform the
3085                          * caller to cancel the request for this stripe.*/
3086                         lock_res_and_lock(matched);
3087                         if (ldlm_extent_equal(&policy->l_extent,
3088                             &matched->l_policy_data.l_extent))
3089                                 rc = -EEXIST;
3090                         else
3091                                 rc = -ECANCELED;
3092                         unlock_res_and_lock(matched);
3093
3094                         ldlm_lock_decref(&lockh, mode);
3095                         LDLM_LOCK_PUT(matched);
3096                         RETURN(rc);
3097                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
3098                         *flags |= LDLM_FL_LVB_READY;
3099
3100                         /* We already have a lock, and it's referenced. */
3101                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
3102
3103                         ldlm_lock_decref(&lockh, mode);
3104                         LDLM_LOCK_PUT(matched);
3105                         RETURN(ELDLM_OK);
3106                 } else {
3107                         ldlm_lock_decref(&lockh, mode);
3108                         LDLM_LOCK_PUT(matched);
3109                 }
3110         }
3111
3112         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
3113                 RETURN(-ENOLCK);
3114
3115         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3116         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3117
3118         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3119                               sizeof(*lvb), LVB_T_OST, &lockh, async);
3120         if (async) {
3121                 if (!rc) {
3122                         struct osc_enqueue_args *aa;
3123                         aa = ptlrpc_req_async_args(aa, req);
3124                         aa->oa_exp         = exp;
3125                         aa->oa_mode        = einfo->ei_mode;
3126                         aa->oa_type        = einfo->ei_type;
3127                         lustre_handle_copy(&aa->oa_lockh, &lockh);
3128                         aa->oa_upcall      = upcall;
3129                         aa->oa_cookie      = cookie;
3130                         aa->oa_speculative = speculative;
3131                         if (!speculative) {
3132                                 aa->oa_flags  = flags;
3133                                 aa->oa_lvb    = lvb;
3134                         } else {
3135                                 /* speculative locks are essentially to enqueue
3136                                  * a DLM lock  in advance, so we don't care
3137                                  * about the result of the enqueue. */
3138                                 aa->oa_lvb    = NULL;
3139                                 aa->oa_flags  = NULL;
3140                         }
3141
3142                         req->rq_interpret_reply = osc_enqueue_interpret;
3143                         ptlrpc_set_add_req(rqset, req);
3144                 }
3145                 RETURN(rc);
3146         }
3147
3148         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
3149                               flags, speculative, rc);
3150
3151         RETURN(rc);
3152 }
3153
3154 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
3155                    struct ldlm_res_id *res_id, enum ldlm_type type,
3156                    union ldlm_policy_data *policy, enum ldlm_mode mode,
3157                    __u64 *flags, struct osc_object *obj,
3158                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
3159 {
3160         struct obd_device *obd = exp->exp_obd;
3161         __u64 lflags = *flags;
3162         enum ldlm_mode rc;
3163         ENTRY;
3164
3165         if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3166                 RETURN(-EIO);
3167
3168         /* Filesystem lock extents are extended to page boundaries so that
3169          * dealing with the page cache is a little smoother */
3170         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3171         policy->l_extent.end |= ~PAGE_MASK;
3172
3173         /* Next, search for already existing extent locks that will cover us */
3174         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
3175                                         res_id, type, policy, mode, lockh,
3176                                         match_flags);
3177         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
3178                 RETURN(rc);
3179
3180         if (obj != NULL) {
3181                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3182
3183                 LASSERT(lock != NULL);
3184                 if (osc_set_lock_data(lock, obj)) {
3185                         lock_res_and_lock(lock);
3186                         if (!ldlm_is_lvb_cached(lock)) {
3187                                 LASSERT(lock->l_ast_data == obj);
3188                                 osc_lock_lvb_update(env, obj, lock, NULL);
3189                                 ldlm_set_lvb_cached(lock);
3190                         }
3191                         unlock_res_and_lock(lock);
3192                 } else {
3193                         ldlm_lock_decref(lockh, rc);
3194                         rc = 0;
3195                 }
3196                 LDLM_LOCK_PUT(lock);
3197         }
3198         RETURN(rc);
3199 }
3200
3201 static int osc_statfs_interpret(const struct lu_env *env,
3202                                 struct ptlrpc_request *req, void *args, int rc)
3203 {
3204         struct osc_async_args *aa = args;
3205         struct obd_statfs *msfs;
3206
3207         ENTRY;
3208         if (rc == -EBADR)
3209                 /*
3210                  * The request has in fact never been sent due to issues at
3211                  * a higher level (LOV).  Exit immediately since the caller
3212                  * is aware of the problem and takes care of the clean up.
3213                  */
3214                 RETURN(rc);
3215
3216         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3217             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3218                 GOTO(out, rc = 0);
3219
3220         if (rc != 0)
3221                 GOTO(out, rc);
3222
3223         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3224         if (msfs == NULL)
3225                 GOTO(out, rc = -EPROTO);
3226
3227         *aa->aa_oi->oi_osfs = *msfs;
3228 out:
3229         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3230
3231         RETURN(rc);
3232 }
3233
3234 static int osc_statfs_async(struct obd_export *exp,
3235                             struct obd_info *oinfo, time64_t max_age,
3236                             struct ptlrpc_request_set *rqset)
3237 {
3238         struct obd_device     *obd = class_exp2obd(exp);
3239         struct ptlrpc_request *req;
3240         struct osc_async_args *aa;
3241         int rc;
3242         ENTRY;
3243
3244         if (obd->obd_osfs_age >= max_age) {
3245                 CDEBUG(D_SUPER,
3246                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3247                        obd->obd_name, &obd->obd_osfs,
3248                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3249                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3250                 spin_lock(&obd->obd_osfs_lock);
3251                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3252                 spin_unlock(&obd->obd_osfs_lock);
3253                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3254                 if (oinfo->oi_cb_up)
3255                         oinfo->oi_cb_up(oinfo, 0);
3256
3257                 RETURN(0);
3258         }
3259
3260         /* We could possibly pass max_age in the request (as an absolute
3261          * timestamp or a "seconds.usec ago") so the target can avoid doing
3262          * extra calls into the filesystem if that isn't necessary (e.g.
3263          * during mount that would help a bit).  Having relative timestamps
3264          * is not so great if request processing is slow, while absolute
3265          * timestamps are not ideal because they need time synchronization. */
3266         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3267         if (req == NULL)
3268                 RETURN(-ENOMEM);
3269
3270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3271         if (rc) {
3272                 ptlrpc_request_free(req);
3273                 RETURN(rc);
3274         }
3275         ptlrpc_request_set_replen(req);
3276         req->rq_request_portal = OST_CREATE_PORTAL;
3277         ptlrpc_at_set_req_timeout(req);
3278
3279         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3280                 /* procfs requests not want stat in wait for avoid deadlock */
3281                 req->rq_no_resend = 1;
3282                 req->rq_no_delay = 1;
3283         }
3284
3285         req->rq_interpret_reply = osc_statfs_interpret;
3286         aa = ptlrpc_req_async_args(aa, req);
3287         aa->aa_oi = oinfo;
3288
3289         ptlrpc_set_add_req(rqset, req);
3290         RETURN(0);
3291 }
3292
3293 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3294                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3295 {
3296         struct obd_device     *obd = class_exp2obd(exp);
3297         struct obd_statfs     *msfs;
3298         struct ptlrpc_request *req;
3299         struct obd_import     *imp, *imp0;
3300         int rc;
3301         ENTRY;
3302
3303         /*Since the request might also come from lprocfs, so we need
3304          *sync this with client_disconnect_export Bug15684
3305          */
3306         with_imp_locked(obd, imp0, rc)
3307                 imp = class_import_get(imp0);
3308         if (rc)
3309                 RETURN(rc);
3310
3311         /* We could possibly pass max_age in the request (as an absolute
3312          * timestamp or a "seconds.usec ago") so the target can avoid doing
3313          * extra calls into the filesystem if that isn't necessary (e.g.
3314          * during mount that would help a bit).  Having relative timestamps
3315          * is not so great if request processing is slow, while absolute
3316          * timestamps are not ideal because they need time synchronization. */
3317         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3318
3319         class_import_put(imp);
3320
3321         if (req == NULL)
3322                 RETURN(-ENOMEM);
3323
3324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3325         if (rc) {
3326                 ptlrpc_request_free(req);
3327                 RETURN(rc);
3328         }
3329         ptlrpc_request_set_replen(req);
3330         req->rq_request_portal = OST_CREATE_PORTAL;
3331         ptlrpc_at_set_req_timeout(req);
3332
3333         if (flags & OBD_STATFS_NODELAY) {
3334                 /* procfs requests not want stat in wait for avoid deadlock */
3335                 req->rq_no_resend = 1;
3336                 req->rq_no_delay = 1;
3337         }
3338
3339         rc = ptlrpc_queue_wait(req);
3340         if (rc)
3341                 GOTO(out, rc);
3342
3343         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3344         if (msfs == NULL)
3345                 GOTO(out, rc = -EPROTO);
3346
3347         *osfs = *msfs;
3348
3349         EXIT;
3350 out:
3351         ptlrpc_req_finished(req);
3352         return rc;
3353 }
3354
3355 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3356                          void *karg, void __user *uarg)
3357 {
3358         struct obd_device *obd = exp->exp_obd;
3359         struct obd_ioctl_data *data;
3360         int rc;
3361
3362         ENTRY;
3363         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
3364                obd->obd_name, cmd, len, karg, uarg);
3365
3366         if (!try_module_get(THIS_MODULE)) {
3367                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3368                        module_name(THIS_MODULE));
3369                 RETURN(-EINVAL);
3370         }
3371
3372         switch (cmd) {
3373         case OBD_IOC_CLIENT_RECOVER:
3374                 if (unlikely(karg == NULL)) {
3375                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3376                                       rc = -EINVAL);
3377                         break;
3378                 }
3379                 data = karg;
3380                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3381                                            data->ioc_inlbuf1, 0);
3382                 if (rc > 0)
3383                         rc = 0;
3384                 break;
3385         case OBD_IOC_GETATTR:
3386                 if (unlikely(karg == NULL)) {
3387                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3388                                       rc = -EINVAL);
3389                         break;
3390                 }
3391                 data = karg;
3392                 rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
3393                 break;
3394 #ifdef IOC_OSC_SET_ACTIVE
3395         case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
3396 #endif
3397         case OBD_IOC_SET_ACTIVE:
3398                 if (unlikely(karg == NULL)) {
3399                         OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3400                                       rc = -EINVAL);
3401                         break;
3402                 }
3403                 data = karg;
3404                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3405                                               data->ioc_offset);
3406                 break;
3407         default:
3408                 rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized",
3409                                    -ENOTTY);
3410                 break;
3411         }
3412
3413         module_put(THIS_MODULE);
3414         return rc;
3415 }
3416
3417 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3418                        u32 keylen, void *key, u32 vallen, void *val,
3419                        struct ptlrpc_request_set *set)
3420 {
3421         struct ptlrpc_request *req;
3422         struct obd_device *obd = exp->exp_obd;
3423         struct obd_import *imp = class_exp2cliimp(exp);
3424         char *tmp;
3425         int rc;
3426         ENTRY;
3427
3428         CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3429
3430         if (KEY_IS(KEY_CHECKSUM)) {
3431                 if (vallen != sizeof(int))
3432                         RETURN(-EINVAL);
3433                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3434                 RETURN(0);
3435         }
3436
3437         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3438                 sptlrpc_conf_client_adapt(obd);
3439                 RETURN(0);
3440         }
3441
3442         if (KEY_IS(KEY_FLUSH_CTX)) {
3443                 sptlrpc_import_flush_my_ctx(imp);
3444                 RETURN(0);
3445         }
3446
3447         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3448                 struct client_obd *cli = &obd->u.cli;
3449                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3450                 long target = *(long *)val;
3451
3452                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3453                 *(long *)val -= nr;
3454                 RETURN(0);
3455         }
3456
3457         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3458                 RETURN(-EINVAL);
3459
3460         /*
3461          * We pass all other commands directly to OST. Since nobody calls osc
3462          * methods directly and everybody is supposed to go through LOV, we
3463          * assume lov checked invalid values for us.
3464          * The only recognised values so far are evict_by_nid and mds_conn.
3465          * Even if something bad goes through, we'd get a -EINVAL from OST
3466          * anyway.
3467          */
3468
3469         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3470                                                 &RQF_OST_SET_GRANT_INFO :
3471                                                 &RQF_OBD_SET_INFO);
3472         if (req == NULL)
3473                 RETURN(-ENOMEM);
3474
3475         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3476                              RCL_CLIENT, keylen);
3477         if (!KEY_IS(KEY_GRANT_SHRINK))
3478                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3479                                      RCL_CLIENT, vallen);
3480         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3481         if (rc) {
3482                 ptlrpc_request_free(req);
3483                 RETURN(rc);
3484         }
3485
3486         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3487         memcpy(tmp, key, keylen);
3488         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3489                                                         &RMF_OST_BODY :
3490                                                         &RMF_SETINFO_VAL);
3491         memcpy(tmp, val, vallen);
3492
3493         if (KEY_IS(KEY_GRANT_SHRINK)) {
3494                 struct osc_grant_args *aa;
3495                 struct obdo *oa;
3496
3497                 aa = ptlrpc_req_async_args(aa, req);
3498                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3499                 if (!oa) {
3500                         ptlrpc_req_finished(req);
3501                         RETURN(-ENOMEM);
3502                 }
3503                 *oa = ((struct ost_body *)val)->oa;
3504                 aa->aa_oa = oa;
3505                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3506         }
3507
3508         ptlrpc_request_set_replen(req);
3509         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3510                 LASSERT(set != NULL);
3511                 ptlrpc_set_add_req(set, req);
3512                 ptlrpc_check_set(NULL, set);
3513         } else {
3514                 ptlrpcd_add_req(req);
3515         }
3516
3517         RETURN(0);
3518 }
3519 EXPORT_SYMBOL(osc_set_info_async);
3520
3521 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3522                   struct obd_device *obd, struct obd_uuid *cluuid,
3523                   struct obd_connect_data *data, void *localdata)
3524 {
3525         struct client_obd *cli = &obd->u.cli;
3526
3527         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3528                 long lost_grant;
3529                 long grant;
3530
3531                 spin_lock(&cli->cl_loi_list_lock);
3532                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3533                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3534                         /* restore ocd_grant_blkbits as client page bits */
3535                         data->ocd_grant_blkbits = PAGE_SHIFT;
3536                         grant += cli->cl_dirty_grant;
3537                 } else {
3538                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3539                 }
3540                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3541                 lost_grant = cli->cl_lost_grant;
3542                 cli->cl_lost_grant = 0;
3543                 spin_unlock(&cli->cl_loi_list_lock);
3544
3545                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3546                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3547                        data->ocd_version, data->ocd_grant, lost_grant);
3548         }
3549
3550         RETURN(0);
3551 }
3552 EXPORT_SYMBOL(osc_reconnect);
3553
3554 int osc_disconnect(struct obd_export *exp)
3555 {
3556         struct obd_device *obd = class_exp2obd(exp);
3557         int rc;
3558
3559         rc = client_disconnect_export(exp);
3560         /**
3561          * Initially we put del_shrink_grant before disconnect_export, but it
3562          * causes the following problem if setup (connect) and cleanup
3563          * (disconnect) are tangled together.
3564          *      connect p1                     disconnect p2
3565          *   ptlrpc_connect_import
3566          *     ...............               class_manual_cleanup
3567          *                                     osc_disconnect
3568          *                                     del_shrink_grant
3569          *   ptlrpc_connect_interrupt
3570          *     osc_init_grant
3571          *   add this client to shrink list
3572          *                                      cleanup_osc
3573          * Bang! grant shrink thread trigger the shrink. BUG18662
3574          */
3575         osc_del_grant_list(&obd->u.cli);
3576         return rc;
3577 }
3578 EXPORT_SYMBOL(osc_disconnect);
3579
3580 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3581                                  struct hlist_node *hnode, void *arg)
3582 {
3583         struct lu_env *env = arg;
3584         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3585         struct ldlm_lock *lock;
3586         struct osc_object *osc = NULL;
3587         ENTRY;
3588
3589         lock_res(res);
3590         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3591                 if (lock->l_ast_data != NULL && osc == NULL) {
3592                         osc = lock->l_ast_data;
3593                         cl_object_get(osc2cl(osc));
3594                 }
3595
3596                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3597                  * by the 2nd round of ldlm_namespace_clean() call in
3598                  * osc_import_event(). */
3599                 ldlm_clear_cleaned(lock);
3600         }
3601         unlock_res(res);
3602
3603         if (osc != NULL) {
3604                 osc_object_invalidate(env, osc);
3605                 cl_object_put(env, osc2cl(osc));
3606         }
3607
3608         RETURN(0);
3609 }
3610 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3611
3612 static int osc_import_event(struct obd_device *obd, struct obd_import *imp,
3613                             enum obd_import_event event)
3614 {
3615         struct client_obd *cli;
3616         int rc = 0;
3617
3618         ENTRY;
3619         if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
3620                 RETURN(-ENODEV);
3621
3622         switch (event) {
3623         case IMP_EVENT_DISCON: {
3624                 cli = &obd->u.cli;
3625                 if (!cli)
3626                         RETURN(-ENODEV);
3627                 spin_lock(&cli->cl_loi_list_lock);
3628                 cli->cl_avail_grant = 0;
3629                 cli->cl_lost_grant = 0;
3630                 spin_unlock(&cli->cl_loi_list_lock);
3631                 break;
3632         }
3633         case IMP_EVENT_INACTIVE: {
3634                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3635                 break;
3636         }
3637         case IMP_EVENT_INVALIDATE: {
3638                 struct ldlm_namespace *ns = obd->obd_namespace;
3639                 struct lu_env *env;
3640                 __u16 refcheck;
3641
3642                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3643
3644                 env = cl_env_get(&refcheck);
3645                 if (!IS_ERR(env)) {
3646                         osc_io_unplug(env, &obd->u.cli, NULL);
3647
3648                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3649                                                  osc_ldlm_resource_invalidate,
3650                                                  env, 0);
3651                         cl_env_put(env, &refcheck);
3652
3653                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3654                 } else {
3655                         rc = PTR_ERR(env);
3656                 }
3657                 break;
3658         }
3659         case IMP_EVENT_ACTIVE: {
3660                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3661                 break;
3662         }
3663         case IMP_EVENT_OCD: {
3664                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3665
3666                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3667                         osc_init_grant(&obd->u.cli, ocd);
3668
3669                 /* See bug 7198 */
3670                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3671                         imp->imp_client->cli_request_portal =
3672                                 OST_REQUEST_PORTAL;
3673
3674                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3675                 break;
3676         }
3677         case IMP_EVENT_DEACTIVATE: {
3678                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3679                 break;
3680         }
3681         case IMP_EVENT_ACTIVATE: {
3682                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3683                 break;
3684         }
3685         default:
3686                 CERROR("%s: Unknown import event %d: rc = %d\n",
3687                        obd->obd_name, event, -EINVAL);
3688                 LBUG();
3689         }
3690         RETURN(rc);
3691 }
3692
3693 /**
3694  * Determine whether the lock can be canceled before replaying the lock
3695  * during recovery, see bug16774 for detailed information.
3696  *
3697  * \retval zero the lock can't be canceled
3698  * \retval other ok to cancel
3699  */
3700 static int osc_cancel_weight(struct ldlm_lock *lock)
3701 {
3702         /*
3703          * Cancel all unused and granted extent lock.
3704          */
3705         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3706             ldlm_is_granted(lock) &&
3707             osc_ldlm_weigh_ast(lock) == 0)
3708                 RETURN(1);
3709
3710         RETURN(0);
3711 }
3712
3713 static int brw_queue_work(const struct lu_env *env, void *data)
3714 {
3715         struct client_obd *cli = data;
3716
3717         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3718
3719         osc_io_unplug(env, cli, NULL);
3720         RETURN(0);
3721 }
3722
3723 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3724 {
3725         struct client_obd *cli = &obd->u.cli;
3726         void *handler;
3727         int rc;
3728
3729         ENTRY;
3730
3731         rc = ptlrpcd_addref();
3732         if (rc)
3733                 RETURN(rc);
3734
3735         rc = client_obd_setup(obd, lcfg);
3736         if (rc)
3737                 GOTO(out_ptlrpcd, rc);
3738
3739
3740         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3741         if (IS_ERR(handler))
3742                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3743         cli->cl_writeback_work = handler;
3744
3745         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3746         if (IS_ERR(handler))
3747                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3748         cli->cl_lru_work = handler;
3749
3750         rc = osc_quota_setup(obd);
3751         if (rc)
3752                 GOTO(out_ptlrpcd_work, rc);
3753
3754         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3755         cli->cl_root_squash = 0;
3756         osc_update_next_shrink(cli);
3757
3758         RETURN(rc);
3759
3760 out_ptlrpcd_work:
3761         if (cli->cl_writeback_work != NULL) {
3762                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3763                 cli->cl_writeback_work = NULL;
3764         }
3765         if (cli->cl_lru_work != NULL) {
3766                 ptlrpcd_destroy_work(cli->cl_lru_work);
3767                 cli->cl_lru_work = NULL;
3768         }
3769         client_obd_cleanup(obd);
3770 out_ptlrpcd:
3771         ptlrpcd_decref();
3772         RETURN(rc);
3773 }
3774 EXPORT_SYMBOL(osc_setup_common);
3775
3776 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3777 {
3778         struct client_obd *cli = &obd->u.cli;
3779         int                adding;
3780         int                added;
3781         int                req_count;
3782         int                rc;
3783
3784         ENTRY;
3785
3786         rc = osc_setup_common(obd, lcfg);
3787         if (rc < 0)
3788                 RETURN(rc);
3789
3790         rc = osc_tunables_init(obd);
3791         if (rc)
3792                 RETURN(rc);
3793
3794         /*
3795          * We try to control the total number of requests with a upper limit
3796          * osc_reqpool_maxreqcount. There might be some race which will cause
3797          * over-limit allocation, but it is fine.
3798          */
3799         req_count = atomic_read(&osc_pool_req_count);
3800         if (req_count < osc_reqpool_maxreqcount) {
3801                 adding = cli->cl_max_rpcs_in_flight + 2;
3802                 if (req_count + adding > osc_reqpool_maxreqcount)
3803                         adding = osc_reqpool_maxreqcount - req_count;
3804
3805                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3806                 atomic_add(added, &osc_pool_req_count);
3807         }
3808
3809         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3810
3811         spin_lock(&osc_shrink_lock);
3812         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3813         spin_unlock(&osc_shrink_lock);
3814         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3815         cli->cl_import->imp_idle_debug = D_HA;
3816
3817         RETURN(0);
3818 }
3819
3820 int osc_precleanup_common(struct obd_device *obd)
3821 {
3822         struct client_obd *cli = &obd->u.cli;
3823         ENTRY;
3824
3825         /* LU-464
3826          * for echo client, export may be on zombie list, wait for
3827          * zombie thread to cull it, because cli.cl_import will be
3828          * cleared in client_disconnect_export():
3829          *   class_export_destroy() -> obd_cleanup() ->
3830          *   echo_device_free() -> echo_client_cleanup() ->
3831          *   obd_disconnect() -> osc_disconnect() ->
3832          *   client_disconnect_export()
3833          */
3834         obd_zombie_barrier();
3835         if (cli->cl_writeback_work) {
3836                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3837                 cli->cl_writeback_work = NULL;
3838         }
3839
3840         if (cli->cl_lru_work) {
3841                 ptlrpcd_destroy_work(cli->cl_lru_work);
3842                 cli->cl_lru_work = NULL;
3843         }
3844
3845         obd_cleanup_client_import(obd);
3846         RETURN(0);
3847 }
3848 EXPORT_SYMBOL(osc_precleanup_common);
3849
3850 static int osc_precleanup(struct obd_device *obd)
3851 {
3852         ENTRY;
3853
3854         osc_precleanup_common(obd);
3855
3856         ptlrpc_lprocfs_unregister_obd(obd);
3857         RETURN(0);
3858 }
3859
3860 int osc_cleanup_common(struct obd_device *obd)
3861 {
3862         struct client_obd *cli = &obd->u.cli;
3863         int rc;
3864
3865         ENTRY;
3866
3867         spin_lock(&osc_shrink_lock);
3868         list_del(&cli->cl_shrink_list);
3869         spin_unlock(&osc_shrink_lock);
3870
3871         /* lru cleanup */
3872         if (cli->cl_cache != NULL) {
3873                 LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
3874                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3875                 list_del_init(&cli->cl_lru_osc);
3876                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3877                 cli->cl_lru_left = NULL;
3878                 cl_cache_decref(cli->cl_cache);
3879                 cli->cl_cache = NULL;
3880         }
3881
3882         /* free memory of osc quota cache */
3883         osc_quota_cleanup(obd);
3884
3885         rc = client_obd_cleanup(obd);
3886
3887         ptlrpcd_decref();
3888         RETURN(rc);
3889 }
3890 EXPORT_SYMBOL(osc_cleanup_common);
3891
3892 static const struct obd_ops osc_obd_ops = {
3893         .o_owner                = THIS_MODULE,
3894         .o_setup                = osc_setup,
3895         .o_precleanup           = osc_precleanup,
3896         .o_cleanup              = osc_cleanup_common,
3897         .o_add_conn             = client_import_add_conn,
3898         .o_del_conn             = client_import_del_conn,
3899         .o_connect              = client_connect_import,
3900         .o_reconnect            = osc_reconnect,
3901         .o_disconnect           = osc_disconnect,
3902         .o_statfs               = osc_statfs,
3903         .o_statfs_async         = osc_statfs_async,
3904         .o_create               = osc_create,
3905         .o_destroy              = osc_destroy,
3906         .o_getattr              = osc_getattr,
3907         .o_setattr              = osc_setattr,
3908         .o_iocontrol            = osc_iocontrol,
3909         .o_set_info_async       = osc_set_info_async,
3910         .o_import_event         = osc_import_event,
3911         .o_quotactl             = osc_quotactl,
3912 };
3913
3914 LIST_HEAD(osc_shrink_list);
3915 DEFINE_SPINLOCK(osc_shrink_lock);
3916
3917 #ifdef HAVE_SHRINKER_COUNT
3918 static struct shrinker osc_cache_shrinker = {
3919         .count_objects  = osc_cache_shrink_count,
3920         .scan_objects   = osc_cache_shrink_scan,
3921         .seeks          = DEFAULT_SEEKS,
3922 };
3923 #else
3924 static int osc_cache_shrink(struct shrinker *shrinker,
3925                             struct shrink_control *sc)
3926 {
3927         (void)osc_cache_shrink_scan(shrinker, sc);
3928
3929         return osc_cache_shrink_count(shrinker, sc);
3930 }
3931
3932 static struct shrinker osc_cache_shrinker = {
3933         .shrink   = osc_cache_shrink,
3934         .seeks    = DEFAULT_SEEKS,
3935 };
3936 #endif
3937
3938 static int __init osc_init(void)
3939 {
3940         unsigned int reqpool_size;
3941         unsigned int reqsize;
3942         int rc;
3943         ENTRY;
3944
3945         /* print an address of _any_ initialized kernel symbol from this
3946          * module, to allow debugging with gdb that doesn't support data
3947          * symbols from modules.*/
3948         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3949
3950         rc = lu_kmem_init(osc_caches);
3951         if (rc)
3952                 RETURN(rc);
3953
3954         rc = register_shrinker(&osc_cache_shrinker);
3955         if (rc)
3956                 GOTO(out_kmem, rc);
3957
3958         /* This is obviously too much memory, only prevent overflow here */
3959         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3960                 GOTO(out_shrinker, rc = -EINVAL);
3961
3962         reqpool_size = osc_reqpool_mem_max << 20;
3963
3964         reqsize = 1;
3965         while (reqsize < OST_IO_MAXREQSIZE)
3966                 reqsize = reqsize << 1;
3967
3968         /*
3969          * We don't enlarge the request count in OSC pool according to
3970          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3971          * tried after normal allocation failed. So a small OSC pool won't
3972          * cause much performance degression in most of cases.
3973          */
3974         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3975
3976         atomic_set(&osc_pool_req_count, 0);
3977         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3978                                           ptlrpc_add_rqs_to_pool);
3979
3980         if (osc_rq_pool == NULL)
3981                 GOTO(out_shrinker, rc = -ENOMEM);
3982
3983         rc = osc_start_grant_work();
3984         if (rc != 0)
3985                 GOTO(out_req_pool, rc);
3986
3987         rc = class_register_type(&osc_obd_ops, NULL, true,
3988                                  LUSTRE_OSC_NAME, &osc_device_type);
3989         if (rc < 0)
3990                 GOTO(out_stop_grant, rc);
3991
3992         RETURN(rc);
3993
3994 out_stop_grant:
3995         osc_stop_grant_work();
3996 out_req_pool:
3997         ptlrpc_free_rq_pool(osc_rq_pool);
3998 out_shrinker:
3999         unregister_shrinker(&osc_cache_shrinker);
4000 out_kmem:
4001         lu_kmem_fini(osc_caches);
4002
4003         RETURN(rc);
4004 }
4005
4006 static void __exit osc_exit(void)
4007 {
4008         class_unregister_type(LUSTRE_OSC_NAME);
4009         ptlrpc_free_rq_pool(osc_rq_pool);
4010         osc_stop_grant_work();
4011         unregister_shrinker(&osc_cache_shrinker);
4012         lu_kmem_fini(osc_caches);
4013 }
4014
4015 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4016 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4017 MODULE_VERSION(LUSTRE_VERSION_STRING);
4018 MODULE_LICENSE("GPL");
4019
4020 module_init(osc_init);
4021 module_exit(osc_exit);