Whamcloud - gitweb
LU-12477 libcfs: Further reduce complexity for shrinkers.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49 #include <linux/falloc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         /*
453          * Only mode == 0 (which is standard prealloc) is supported now.
454          * Punch is not supported yet.
455          */
456         if (mode & ~FALLOC_FL_KEEP_SIZE)
457                 RETURN(-EOPNOTSUPP);
458         oa->o_falloc_mode = mode;
459
460         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461                                    &RQF_OST_FALLOCATE);
462         if (req == NULL)
463                 RETURN(-ENOMEM);
464
465         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
466         if (rc != 0) {
467                 ptlrpc_request_free(req);
468                 RETURN(rc);
469         }
470
471         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
472         LASSERT(body);
473
474         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
475
476         ptlrpc_request_set_replen(req);
477
478         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
479         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
480         sa = ptlrpc_req_async_args(sa, req);
481         sa->sa_oa = oa;
482         sa->sa_upcall = upcall;
483         sa->sa_cookie = cookie;
484
485         ptlrpcd_add_req(req);
486
487         RETURN(0);
488 }
489
490 static int osc_sync_interpret(const struct lu_env *env,
491                               struct ptlrpc_request *req, void *args, int rc)
492 {
493         struct osc_fsync_args *fa = args;
494         struct ost_body *body;
495         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
496         unsigned long valid = 0;
497         struct cl_object *obj;
498         ENTRY;
499
500         if (rc != 0)
501                 GOTO(out, rc);
502
503         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
504         if (body == NULL) {
505                 CERROR("can't unpack ost_body\n");
506                 GOTO(out, rc = -EPROTO);
507         }
508
509         *fa->fa_oa = body->oa;
510         obj = osc2cl(fa->fa_obj);
511
512         /* Update osc object's blocks attribute */
513         cl_object_attr_lock(obj);
514         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
515                 attr->cat_blocks = body->oa.o_blocks;
516                 valid |= CAT_BLOCKS;
517         }
518
519         if (valid != 0)
520                 cl_object_attr_update(env, obj, attr, valid);
521         cl_object_attr_unlock(obj);
522
523 out:
524         rc = fa->fa_upcall(fa->fa_cookie, rc);
525         RETURN(rc);
526 }
527
528 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
529                   obd_enqueue_update_f upcall, void *cookie,
530                   struct ptlrpc_request_set *rqset)
531 {
532         struct obd_export     *exp = osc_export(obj);
533         struct ptlrpc_request *req;
534         struct ost_body       *body;
535         struct osc_fsync_args *fa;
536         int                    rc;
537         ENTRY;
538
539         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
540         if (req == NULL)
541                 RETURN(-ENOMEM);
542
543         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
544         if (rc) {
545                 ptlrpc_request_free(req);
546                 RETURN(rc);
547         }
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
551         LASSERT(body);
552         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
553
554         ptlrpc_request_set_replen(req);
555         req->rq_interpret_reply = osc_sync_interpret;
556
557         fa = ptlrpc_req_async_args(fa, req);
558         fa->fa_obj = obj;
559         fa->fa_oa = oa;
560         fa->fa_upcall = upcall;
561         fa->fa_cookie = cookie;
562
563         ptlrpc_set_add_req(rqset, req);
564
565         RETURN (0);
566 }
567
568 /* Find and cancel locally locks matched by @mode in the resource found by
569  * @objid. Found locks are added into @cancel list. Returns the amount of
570  * locks added to @cancels list. */
571 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
572                                    struct list_head *cancels,
573                                    enum ldlm_mode mode, __u64 lock_flags)
574 {
575         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
576         struct ldlm_res_id res_id;
577         struct ldlm_resource *res;
578         int count;
579         ENTRY;
580
581         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
582          * export) but disabled through procfs (flag in NS).
583          *
584          * This distinguishes from a case when ELC is not supported originally,
585          * when we still want to cancel locks in advance and just cancel them
586          * locally, without sending any RPC. */
587         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
588                 RETURN(0);
589
590         ostid_build_res_name(&oa->o_oi, &res_id);
591         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
592         if (IS_ERR(res))
593                 RETURN(0);
594
595         LDLM_RESOURCE_ADDREF(res);
596         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
597                                            lock_flags, 0, NULL);
598         LDLM_RESOURCE_DELREF(res);
599         ldlm_resource_putref(res);
600         RETURN(count);
601 }
602
603 static int osc_destroy_interpret(const struct lu_env *env,
604                                  struct ptlrpc_request *req, void *args, int rc)
605 {
606         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
607
608         atomic_dec(&cli->cl_destroy_in_flight);
609         wake_up(&cli->cl_destroy_waitq);
610
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 wake_up(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
633                        struct obdo *oa)
634 {
635         struct client_obd     *cli = &exp->exp_obd->u.cli;
636         struct ptlrpc_request *req;
637         struct ost_body       *body;
638         LIST_HEAD(cancels);
639         int rc, count;
640         ENTRY;
641
642         if (!oa) {
643                 CDEBUG(D_INFO, "oa NULL\n");
644                 RETURN(-EINVAL);
645         }
646
647         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
648                                         LDLM_FL_DISCARD_DATA);
649
650         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
651         if (req == NULL) {
652                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
653                 RETURN(-ENOMEM);
654         }
655
656         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
657                                0, &cancels, count);
658         if (rc) {
659                 ptlrpc_request_free(req);
660                 RETURN(rc);
661         }
662
663         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
664         ptlrpc_at_set_req_timeout(req);
665
666         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
667         LASSERT(body);
668         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
669
670         ptlrpc_request_set_replen(req);
671
672         req->rq_interpret_reply = osc_destroy_interpret;
673         if (!osc_can_send_destroy(cli)) {
674                 /*
675                  * Wait until the number of on-going destroy RPCs drops
676                  * under max_rpc_in_flight
677                  */
678                 rc = l_wait_event_abortable_exclusive(
679                         cli->cl_destroy_waitq,
680                         osc_can_send_destroy(cli));
681                 if (rc) {
682                         ptlrpc_req_finished(req);
683                         RETURN(-EINTR);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         if (cli->cl_ocd_grant_param)
702                 oa->o_dirty = cli->cl_dirty_grant;
703         else
704                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
705         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
706                 CERROR("dirty %lu > dirty_max %lu\n",
707                        cli->cl_dirty_pages,
708                        cli->cl_dirty_max_pages);
709                 oa->o_undirty = 0;
710         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
711                             (long)(obd_max_dirty_pages + 1))) {
712                 /* The atomic_read() allowing the atomic_inc() are
713                  * not covered by a lock thus they may safely race and trip
714                  * this CERROR() unless we add in a small fudge factor (+1). */
715                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
716                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
717                        obd_max_dirty_pages);
718                 oa->o_undirty = 0;
719         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
720                             0x7fffffff)) {
721                 CERROR("dirty %lu - dirty_max %lu too big???\n",
722                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
723                 oa->o_undirty = 0;
724         } else {
725                 unsigned long nrpages;
726                 unsigned long undirty;
727
728                 nrpages = cli->cl_max_pages_per_rpc;
729                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
730                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
731                 undirty = nrpages << PAGE_SHIFT;
732                 if (cli->cl_ocd_grant_param) {
733                         int nrextents;
734
735                         /* take extent tax into account when asking for more
736                          * grant space */
737                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
738                                      cli->cl_max_extent_pages;
739                         undirty += nrextents * cli->cl_grant_extent_tax;
740                 }
741                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
742                  * to add extent tax, etc.
743                  */
744                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
745                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
746         }
747         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
748         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
749         if (cli->cl_lost_grant > INT_MAX) {
750                 CDEBUG(D_CACHE,
751                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
752                       cli_name(cli), cli->cl_lost_grant);
753                 oa->o_dropped = INT_MAX;
754         } else {
755                 oa->o_dropped = cli->cl_lost_grant;
756         }
757         cli->cl_lost_grant -= oa->o_dropped;
758         spin_unlock(&cli->cl_loi_list_lock);
759         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
760                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
761                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
762 }
763
764 void osc_update_next_shrink(struct client_obd *cli)
765 {
766         cli->cl_next_shrink_grant = ktime_get_seconds() +
767                                     cli->cl_grant_shrink_interval;
768
769         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
770                cli->cl_next_shrink_grant);
771 }
772
773 static void __osc_update_grant(struct client_obd *cli, u64 grant)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         cli->cl_avail_grant += grant;
777         spin_unlock(&cli->cl_loi_list_lock);
778 }
779
780 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
781 {
782         if (body->oa.o_valid & OBD_MD_FLGRANT) {
783                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
784                 __osc_update_grant(cli, body->oa.o_grant);
785         }
786 }
787
788 /**
789  * grant thread data for shrinking space.
790  */
791 struct grant_thread_data {
792         struct list_head        gtd_clients;
793         struct mutex            gtd_mutex;
794         unsigned long           gtd_stopped:1;
795 };
796 static struct grant_thread_data client_gtd;
797
798 static int osc_shrink_grant_interpret(const struct lu_env *env,
799                                       struct ptlrpc_request *req,
800                                       void *args, int rc)
801 {
802         struct osc_grant_args *aa = args;
803         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
804         struct ost_body *body;
805
806         if (rc != 0) {
807                 __osc_update_grant(cli, aa->aa_oa->o_grant);
808                 GOTO(out, rc);
809         }
810
811         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
812         LASSERT(body);
813         osc_update_grant(cli, body);
814 out:
815         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
816         aa->aa_oa = NULL;
817
818         return rc;
819 }
820
821 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
822 {
823         spin_lock(&cli->cl_loi_list_lock);
824         oa->o_grant = cli->cl_avail_grant / 4;
825         cli->cl_avail_grant -= oa->o_grant;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
828                 oa->o_valid |= OBD_MD_FLFLAGS;
829                 oa->o_flags = 0;
830         }
831         oa->o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833 }
834
835 /* Shrink the current grant, either from some large amount to enough for a
836  * full set of in-flight RPCs, or if we have already shrunk to that limit
837  * then to enough for a single RPC.  This avoids keeping more grant than
838  * needed, and avoids shrinking the grant piecemeal. */
839 static int osc_shrink_grant(struct client_obd *cli)
840 {
841         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
842                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
843
844         spin_lock(&cli->cl_loi_list_lock);
845         if (cli->cl_avail_grant <= target_bytes)
846                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
847         spin_unlock(&cli->cl_loi_list_lock);
848
849         return osc_shrink_grant_to_target(cli, target_bytes);
850 }
851
852 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
853 {
854         int                     rc = 0;
855         struct ost_body        *body;
856         ENTRY;
857
858         spin_lock(&cli->cl_loi_list_lock);
859         /* Don't shrink if we are already above or below the desired limit
860          * We don't want to shrink below a single RPC, as that will negatively
861          * impact block allocation and long-term performance. */
862         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
863                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
864
865         if (target_bytes >= cli->cl_avail_grant) {
866                 spin_unlock(&cli->cl_loi_list_lock);
867                 RETURN(0);
868         }
869         spin_unlock(&cli->cl_loi_list_lock);
870
871         OBD_ALLOC_PTR(body);
872         if (!body)
873                 RETURN(-ENOMEM);
874
875         osc_announce_cached(cli, &body->oa, 0);
876
877         spin_lock(&cli->cl_loi_list_lock);
878         if (target_bytes >= cli->cl_avail_grant) {
879                 /* available grant has changed since target calculation */
880                 spin_unlock(&cli->cl_loi_list_lock);
881                 GOTO(out_free, rc = 0);
882         }
883         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
884         cli->cl_avail_grant = target_bytes;
885         spin_unlock(&cli->cl_loi_list_lock);
886         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
887                 body->oa.o_valid |= OBD_MD_FLFLAGS;
888                 body->oa.o_flags = 0;
889         }
890         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
891         osc_update_next_shrink(cli);
892
893         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
894                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
895                                 sizeof(*body), body, NULL);
896         if (rc != 0)
897                 __osc_update_grant(cli, body->oa.o_grant);
898 out_free:
899         OBD_FREE_PTR(body);
900         RETURN(rc);
901 }
902
903 static int osc_should_shrink_grant(struct client_obd *client)
904 {
905         time64_t next_shrink = client->cl_next_shrink_grant;
906
907         if (client->cl_import == NULL)
908                 return 0;
909
910         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
911             client->cl_import->imp_grant_shrink_disabled) {
912                 osc_update_next_shrink(client);
913                 return 0;
914         }
915
916         if (ktime_get_seconds() >= next_shrink - 5) {
917                 /* Get the current RPC size directly, instead of going via:
918                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
919                  * Keep comment here so that it can be found by searching. */
920                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
921
922                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
923                     client->cl_avail_grant > brw_size)
924                         return 1;
925                 else
926                         osc_update_next_shrink(client);
927         }
928         return 0;
929 }
930
931 #define GRANT_SHRINK_RPC_BATCH  100
932
933 static struct delayed_work work;
934
935 static void osc_grant_work_handler(struct work_struct *data)
936 {
937         struct client_obd *cli;
938         int rpc_sent;
939         bool init_next_shrink = true;
940         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
941
942         rpc_sent = 0;
943         mutex_lock(&client_gtd.gtd_mutex);
944         list_for_each_entry(cli, &client_gtd.gtd_clients,
945                             cl_grant_chain) {
946                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
947                     osc_should_shrink_grant(cli)) {
948                         osc_shrink_grant(cli);
949                         rpc_sent++;
950                 }
951
952                 if (!init_next_shrink) {
953                         if (cli->cl_next_shrink_grant < next_shrink &&
954                             cli->cl_next_shrink_grant > ktime_get_seconds())
955                                 next_shrink = cli->cl_next_shrink_grant;
956                 } else {
957                         init_next_shrink = false;
958                         next_shrink = cli->cl_next_shrink_grant;
959                 }
960         }
961         mutex_unlock(&client_gtd.gtd_mutex);
962
963         if (client_gtd.gtd_stopped == 1)
964                 return;
965
966         if (next_shrink > ktime_get_seconds()) {
967                 time64_t delay = next_shrink - ktime_get_seconds();
968
969                 schedule_delayed_work(&work, cfs_time_seconds(delay));
970         } else {
971                 schedule_work(&work.work);
972         }
973 }
974
975 void osc_schedule_grant_work(void)
976 {
977         cancel_delayed_work_sync(&work);
978         schedule_work(&work.work);
979 }
980
981 /**
982  * Start grant thread for returing grant to server for idle clients.
983  */
984 static int osc_start_grant_work(void)
985 {
986         client_gtd.gtd_stopped = 0;
987         mutex_init(&client_gtd.gtd_mutex);
988         INIT_LIST_HEAD(&client_gtd.gtd_clients);
989
990         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
991         schedule_work(&work.work);
992
993         return 0;
994 }
995
996 static void osc_stop_grant_work(void)
997 {
998         client_gtd.gtd_stopped = 1;
999         cancel_delayed_work_sync(&work);
1000 }
1001
1002 static void osc_add_grant_list(struct client_obd *client)
1003 {
1004         mutex_lock(&client_gtd.gtd_mutex);
1005         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1006         mutex_unlock(&client_gtd.gtd_mutex);
1007 }
1008
1009 static void osc_del_grant_list(struct client_obd *client)
1010 {
1011         if (list_empty(&client->cl_grant_chain))
1012                 return;
1013
1014         mutex_lock(&client_gtd.gtd_mutex);
1015         list_del_init(&client->cl_grant_chain);
1016         mutex_unlock(&client_gtd.gtd_mutex);
1017 }
1018
1019 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1020 {
1021         /*
1022          * ocd_grant is the total grant amount we're expect to hold: if we've
1023          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1024          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1025          * dirty.
1026          *
1027          * race is tolerable here: if we're evicted, but imp_state already
1028          * left EVICTED state, then cl_dirty_pages must be 0 already.
1029          */
1030         spin_lock(&cli->cl_loi_list_lock);
1031         cli->cl_avail_grant = ocd->ocd_grant;
1032         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1033                 unsigned long consumed = cli->cl_reserved_grant;
1034
1035                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1036                         consumed += cli->cl_dirty_grant;
1037                 else
1038                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1039                 if (cli->cl_avail_grant < consumed) {
1040                         CERROR("%s: granted %ld but already consumed %ld\n",
1041                                cli_name(cli), cli->cl_avail_grant, consumed);
1042                         cli->cl_avail_grant = 0;
1043                 } else {
1044                         cli->cl_avail_grant -= consumed;
1045                 }
1046         }
1047
1048         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1049                 u64 size;
1050                 int chunk_mask;
1051
1052                 /* overhead for each extent insertion */
1053                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1054                 /* determine the appropriate chunk size used by osc_extent. */
1055                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1056                                           ocd->ocd_grant_blkbits);
1057                 /* max_pages_per_rpc must be chunk aligned */
1058                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1059                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1060                                              ~chunk_mask) & chunk_mask;
1061                 /* determine maximum extent size, in #pages */
1062                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1063                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1064                 cli->cl_ocd_grant_param = 1;
1065         } else {
1066                 cli->cl_ocd_grant_param = 0;
1067                 cli->cl_grant_extent_tax = 0;
1068                 cli->cl_chunkbits = PAGE_SHIFT;
1069                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1070         }
1071         spin_unlock(&cli->cl_loi_list_lock);
1072
1073         CDEBUG(D_CACHE,
1074                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1075                cli_name(cli),
1076                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1077                cli->cl_max_extent_pages);
1078
1079         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1080                 osc_add_grant_list(cli);
1081 }
1082 EXPORT_SYMBOL(osc_init_grant);
1083
1084 /* We assume that the reason this OSC got a short read is because it read
1085  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1086  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1087  * this stripe never got written at or beyond this stripe offset yet. */
1088 static void handle_short_read(int nob_read, size_t page_count,
1089                               struct brw_page **pga)
1090 {
1091         char *ptr;
1092         int i = 0;
1093
1094         /* skip bytes read OK */
1095         while (nob_read > 0) {
1096                 LASSERT (page_count > 0);
1097
1098                 if (pga[i]->count > nob_read) {
1099                         /* EOF inside this page */
1100                         ptr = kmap(pga[i]->pg) +
1101                                 (pga[i]->off & ~PAGE_MASK);
1102                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1103                         kunmap(pga[i]->pg);
1104                         page_count--;
1105                         i++;
1106                         break;
1107                 }
1108
1109                 nob_read -= pga[i]->count;
1110                 page_count--;
1111                 i++;
1112         }
1113
1114         /* zero remaining pages */
1115         while (page_count-- > 0) {
1116                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1117                 memset(ptr, 0, pga[i]->count);
1118                 kunmap(pga[i]->pg);
1119                 i++;
1120         }
1121 }
1122
1123 static int check_write_rcs(struct ptlrpc_request *req,
1124                            int requested_nob, int niocount,
1125                            size_t page_count, struct brw_page **pga)
1126 {
1127         int     i;
1128         __u32   *remote_rcs;
1129
1130         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1131                                                   sizeof(*remote_rcs) *
1132                                                   niocount);
1133         if (remote_rcs == NULL) {
1134                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1135                 return(-EPROTO);
1136         }
1137
1138         /* return error if any niobuf was in error */
1139         for (i = 0; i < niocount; i++) {
1140                 if ((int)remote_rcs[i] < 0) {
1141                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1142                                i, remote_rcs[i], req);
1143                         return remote_rcs[i];
1144                 }
1145
1146                 if (remote_rcs[i] != 0) {
1147                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1148                                 i, remote_rcs[i], req);
1149                         return(-EPROTO);
1150                 }
1151         }
1152         if (req->rq_bulk != NULL &&
1153             req->rq_bulk->bd_nob_transferred != requested_nob) {
1154                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1155                        req->rq_bulk->bd_nob_transferred, requested_nob);
1156                 return(-EPROTO);
1157         }
1158
1159         return (0);
1160 }
1161
1162 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1163 {
1164         if (p1->flag != p2->flag) {
1165                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1166                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1167                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1168
1169                 /* warn if we try to combine flags that we don't know to be
1170                  * safe to combine */
1171                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1172                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1173                               "report this at https://jira.whamcloud.com/\n",
1174                               p1->flag, p2->flag);
1175                 }
1176                 return 0;
1177         }
1178
1179         return (p1->off + p1->count == p2->off);
1180 }
1181
1182 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1183 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1184                                    size_t pg_count, struct brw_page **pga,
1185                                    int opc, obd_dif_csum_fn *fn,
1186                                    int sector_size,
1187                                    u32 *check_sum)
1188 {
1189         struct ahash_request *req;
1190         /* Used Adler as the default checksum type on top of DIF tags */
1191         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1192         struct page *__page;
1193         unsigned char *buffer;
1194         __u16 *guard_start;
1195         unsigned int bufsize;
1196         int guard_number;
1197         int used_number = 0;
1198         int used;
1199         u32 cksum;
1200         int rc = 0;
1201         int i = 0;
1202
1203         LASSERT(pg_count > 0);
1204
1205         __page = alloc_page(GFP_KERNEL);
1206         if (__page == NULL)
1207                 return -ENOMEM;
1208
1209         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1210         if (IS_ERR(req)) {
1211                 rc = PTR_ERR(req);
1212                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1213                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1214                 GOTO(out, rc);
1215         }
1216
1217         buffer = kmap(__page);
1218         guard_start = (__u16 *)buffer;
1219         guard_number = PAGE_SIZE / sizeof(*guard_start);
1220         while (nob > 0 && pg_count > 0) {
1221                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1222
1223                 /* corrupt the data before we compute the checksum, to
1224                  * simulate an OST->client data error */
1225                 if (unlikely(i == 0 && opc == OST_READ &&
1226                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1227                         unsigned char *ptr = kmap(pga[i]->pg);
1228                         int off = pga[i]->off & ~PAGE_MASK;
1229
1230                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1231                         kunmap(pga[i]->pg);
1232                 }
1233
1234                 /*
1235                  * The left guard number should be able to hold checksums of a
1236                  * whole page
1237                  */
1238                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1239                                                   pga[i]->off & ~PAGE_MASK,
1240                                                   count,
1241                                                   guard_start + used_number,
1242                                                   guard_number - used_number,
1243                                                   &used, sector_size,
1244                                                   fn);
1245                 if (rc)
1246                         break;
1247
1248                 used_number += used;
1249                 if (used_number == guard_number) {
1250                         cfs_crypto_hash_update_page(req, __page, 0,
1251                                 used_number * sizeof(*guard_start));
1252                         used_number = 0;
1253                 }
1254
1255                 nob -= pga[i]->count;
1256                 pg_count--;
1257                 i++;
1258         }
1259         kunmap(__page);
1260         if (rc)
1261                 GOTO(out, rc);
1262
1263         if (used_number != 0)
1264                 cfs_crypto_hash_update_page(req, __page, 0,
1265                         used_number * sizeof(*guard_start));
1266
1267         bufsize = sizeof(cksum);
1268         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1269
1270         /* For sending we only compute the wrong checksum instead
1271          * of corrupting the data so it is still correct on a redo */
1272         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1273                 cksum++;
1274
1275         *check_sum = cksum;
1276 out:
1277         __free_page(__page);
1278         return rc;
1279 }
1280 #else /* !CONFIG_CRC_T10DIF */
1281 #define obd_dif_ip_fn NULL
1282 #define obd_dif_crc_fn NULL
1283 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1284         -EOPNOTSUPP
1285 #endif /* CONFIG_CRC_T10DIF */
1286
1287 static int osc_checksum_bulk(int nob, size_t pg_count,
1288                              struct brw_page **pga, int opc,
1289                              enum cksum_types cksum_type,
1290                              u32 *cksum)
1291 {
1292         int                             i = 0;
1293         struct ahash_request           *req;
1294         unsigned int                    bufsize;
1295         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1296
1297         LASSERT(pg_count > 0);
1298
1299         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1300         if (IS_ERR(req)) {
1301                 CERROR("Unable to initialize checksum hash %s\n",
1302                        cfs_crypto_hash_name(cfs_alg));
1303                 return PTR_ERR(req);
1304         }
1305
1306         while (nob > 0 && pg_count > 0) {
1307                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1308
1309                 /* corrupt the data before we compute the checksum, to
1310                  * simulate an OST->client data error */
1311                 if (i == 0 && opc == OST_READ &&
1312                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1313                         unsigned char *ptr = kmap(pga[i]->pg);
1314                         int off = pga[i]->off & ~PAGE_MASK;
1315
1316                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1317                         kunmap(pga[i]->pg);
1318                 }
1319                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1320                                             pga[i]->off & ~PAGE_MASK,
1321                                             count);
1322                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1323                                (int)(pga[i]->off & ~PAGE_MASK));
1324
1325                 nob -= pga[i]->count;
1326                 pg_count--;
1327                 i++;
1328         }
1329
1330         bufsize = sizeof(*cksum);
1331         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1332
1333         /* For sending we only compute the wrong checksum instead
1334          * of corrupting the data so it is still correct on a redo */
1335         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1336                 (*cksum)++;
1337
1338         return 0;
1339 }
1340
1341 static int osc_checksum_bulk_rw(const char *obd_name,
1342                                 enum cksum_types cksum_type,
1343                                 int nob, size_t pg_count,
1344                                 struct brw_page **pga, int opc,
1345                                 u32 *check_sum)
1346 {
1347         obd_dif_csum_fn *fn = NULL;
1348         int sector_size = 0;
1349         int rc;
1350
1351         ENTRY;
1352         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1353
1354         if (fn)
1355                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1356                                              opc, fn, sector_size, check_sum);
1357         else
1358                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1359                                        check_sum);
1360
1361         RETURN(rc);
1362 }
1363
1364 static inline void osc_release_bounce_pages(struct brw_page **pga,
1365                                             u32 page_count)
1366 {
1367 #ifdef HAVE_LUSTRE_CRYPTO
1368         int i;
1369
1370         for (i = 0; i < page_count; i++) {
1371                 /* Bounce pages allocated by a call to
1372                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1373                  * are identified thanks to the PageChecked flag.
1374                  */
1375                 if (PageChecked(pga[i]->pg))
1376                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1377                 pga[i]->count -= pga[i]->bp_count_diff;
1378                 pga[i]->off += pga[i]->bp_off_diff;
1379         }
1380 #endif
1381 }
1382
1383 static int
1384 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1385                      u32 page_count, struct brw_page **pga,
1386                      struct ptlrpc_request **reqp, int resend)
1387 {
1388         struct ptlrpc_request *req;
1389         struct ptlrpc_bulk_desc *desc;
1390         struct ost_body *body;
1391         struct obd_ioobj *ioobj;
1392         struct niobuf_remote *niobuf;
1393         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1394         struct osc_brw_async_args *aa;
1395         struct req_capsule *pill;
1396         struct brw_page *pg_prev;
1397         void *short_io_buf;
1398         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1399         struct inode *inode;
1400         bool directio = false;
1401
1402         ENTRY;
1403         inode = page2inode(pga[0]->pg);
1404         if (inode == NULL) {
1405                 /* Try to get reference to inode from cl_page if we are
1406                  * dealing with direct IO, as handled pages are not
1407                  * actual page cache pages.
1408                  */
1409                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1410                 struct cl_page *clpage = oap2cl_page(oap);
1411
1412                 inode = clpage->cp_inode;
1413                 if (inode)
1414                         directio = true;
1415         }
1416         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1417                 RETURN(-ENOMEM); /* Recoverable */
1418         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1419                 RETURN(-EINVAL); /* Fatal */
1420
1421         if ((cmd & OBD_BRW_WRITE) != 0) {
1422                 opc = OST_WRITE;
1423                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1424                                                 osc_rq_pool,
1425                                                 &RQF_OST_BRW_WRITE);
1426         } else {
1427                 opc = OST_READ;
1428                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1429         }
1430         if (req == NULL)
1431                 RETURN(-ENOMEM);
1432
1433         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1434                 for (i = 0; i < page_count; i++) {
1435                         struct brw_page *pg = pga[i];
1436                         struct page *data_page = NULL;
1437                         bool retried = false;
1438                         bool lockedbymyself;
1439                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1440                         struct address_space *map_orig = NULL;
1441                         pgoff_t index_orig;
1442
1443 retry_encrypt:
1444                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1445                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1446                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1447                         /* The page can already be locked when we arrive here.
1448                          * This is possible when cl_page_assume/vvp_page_assume
1449                          * is stuck on wait_on_page_writeback with page lock
1450                          * held. In this case there is no risk for the lock to
1451                          * be released while we are doing our encryption
1452                          * processing, because writeback against that page will
1453                          * end in vvp_page_completion_write/cl_page_completion,
1454                          * which means only once the page is fully processed.
1455                          */
1456                         lockedbymyself = trylock_page(pg->pg);
1457                         if (directio) {
1458                                 map_orig = pg->pg->mapping;
1459                                 pg->pg->mapping = inode->i_mapping;
1460                                 index_orig = pg->pg->index;
1461                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1462                         }
1463                         data_page =
1464                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1465                                                                  nunits, 0,
1466                                                                  GFP_NOFS);
1467                         if (directio) {
1468                                 pg->pg->mapping = map_orig;
1469                                 pg->pg->index = index_orig;
1470                         }
1471                         if (lockedbymyself)
1472                                 unlock_page(pg->pg);
1473                         if (IS_ERR(data_page)) {
1474                                 rc = PTR_ERR(data_page);
1475                                 if (rc == -ENOMEM && !retried) {
1476                                         retried = true;
1477                                         rc = 0;
1478                                         goto retry_encrypt;
1479                                 }
1480                                 ptlrpc_request_free(req);
1481                                 RETURN(rc);
1482                         }
1483                         /* Set PageChecked flag on bounce page for
1484                          * disambiguation in osc_release_bounce_pages().
1485                          */
1486                         SetPageChecked(data_page);
1487                         pg->pg = data_page;
1488                         /* there should be no gap in the middle of page array */
1489                         if (i == page_count - 1) {
1490                                 struct osc_async_page *oap = brw_page2oap(pg);
1491
1492                                 oa->o_size = oap->oap_count +
1493                                         oap->oap_obj_off + oap->oap_page_off;
1494                         }
1495                         /* len is forced to nunits, and relative offset to 0
1496                          * so store the old, clear text info
1497                          */
1498                         pg->bp_count_diff = nunits - pg->count;
1499                         pg->count = nunits;
1500                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1501                         pg->off = pg->off & PAGE_MASK;
1502                 }
1503         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1504                 for (i = 0; i < page_count; i++) {
1505                         struct brw_page *pg = pga[i];
1506                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1507
1508                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1509                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1510                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1511                         /* count/off are forced to cover the whole encryption
1512                          * unit size so that all encrypted data is stored on the
1513                          * OST, so adjust bp_{count,off}_diff for the size of
1514                          * the clear text.
1515                          */
1516                         pg->bp_count_diff = nunits - pg->count;
1517                         pg->count = nunits;
1518                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1519                         pg->off = pg->off & PAGE_MASK;
1520                 }
1521         }
1522
1523         for (niocount = i = 1; i < page_count; i++) {
1524                 if (!can_merge_pages(pga[i - 1], pga[i]))
1525                         niocount++;
1526         }
1527
1528         pill = &req->rq_pill;
1529         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1530                              sizeof(*ioobj));
1531         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1532                              niocount * sizeof(*niobuf));
1533
1534         for (i = 0; i < page_count; i++) {
1535                 short_io_size += pga[i]->count;
1536                 if (!inode || !IS_ENCRYPTED(inode)) {
1537                         pga[i]->bp_count_diff = 0;
1538                         pga[i]->bp_off_diff = 0;
1539                 }
1540         }
1541
1542         /* Check if read/write is small enough to be a short io. */
1543         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1544             !imp_connect_shortio(cli->cl_import))
1545                 short_io_size = 0;
1546
1547         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1548                              opc == OST_READ ? 0 : short_io_size);
1549         if (opc == OST_READ)
1550                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1551                                      short_io_size);
1552
1553         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1554         if (rc) {
1555                 ptlrpc_request_free(req);
1556                 RETURN(rc);
1557         }
1558         osc_set_io_portal(req);
1559
1560         ptlrpc_at_set_req_timeout(req);
1561         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1562          * retry logic */
1563         req->rq_no_retry_einprogress = 1;
1564
1565         if (short_io_size != 0) {
1566                 desc = NULL;
1567                 short_io_buf = NULL;
1568                 goto no_bulk;
1569         }
1570
1571         desc = ptlrpc_prep_bulk_imp(req, page_count,
1572                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1573                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1574                         PTLRPC_BULK_PUT_SINK),
1575                 OST_BULK_PORTAL,
1576                 &ptlrpc_bulk_kiov_pin_ops);
1577
1578         if (desc == NULL)
1579                 GOTO(out, rc = -ENOMEM);
1580         /* NB request now owns desc and will free it when it gets freed */
1581 no_bulk:
1582         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1583         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1584         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1585         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1586
1587         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1588
1589         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1590          * and from_kgid(), because they are asynchronous. Fortunately, variable
1591          * oa contains valid o_uid and o_gid in these two operations.
1592          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1593          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1594          * other process logic */
1595         body->oa.o_uid = oa->o_uid;
1596         body->oa.o_gid = oa->o_gid;
1597
1598         obdo_to_ioobj(oa, ioobj);
1599         ioobj->ioo_bufcnt = niocount;
1600         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1601          * that might be send for this request.  The actual number is decided
1602          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1603          * "max - 1" for old client compatibility sending "0", and also so the
1604          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1605         if (desc != NULL)
1606                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1607         else /* short io */
1608                 ioobj_max_brw_set(ioobj, 0);
1609
1610         if (short_io_size != 0) {
1611                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1612                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1613                         body->oa.o_flags = 0;
1614                 }
1615                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1616                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1617                        short_io_size);
1618                 if (opc == OST_WRITE) {
1619                         short_io_buf = req_capsule_client_get(pill,
1620                                                               &RMF_SHORT_IO);
1621                         LASSERT(short_io_buf != NULL);
1622                 }
1623         }
1624
1625         LASSERT(page_count > 0);
1626         pg_prev = pga[0];
1627         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1628                 struct brw_page *pg = pga[i];
1629                 int poff = pg->off & ~PAGE_MASK;
1630
1631                 LASSERT(pg->count > 0);
1632                 /* make sure there is no gap in the middle of page array */
1633                 LASSERTF(page_count == 1 ||
1634                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1635                           ergo(i > 0 && i < page_count - 1,
1636                                poff == 0 && pg->count == PAGE_SIZE)   &&
1637                           ergo(i == page_count - 1, poff == 0)),
1638                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1639                          i, page_count, pg, pg->off, pg->count);
1640                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1641                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1642                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1643                          i, page_count,
1644                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1645                          pg_prev->pg, page_private(pg_prev->pg),
1646                          pg_prev->pg->index, pg_prev->off);
1647                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1648                         (pg->flag & OBD_BRW_SRVLOCK));
1649                 if (short_io_size != 0 && opc == OST_WRITE) {
1650                         unsigned char *ptr = kmap_atomic(pg->pg);
1651
1652                         LASSERT(short_io_size >= requested_nob + pg->count);
1653                         memcpy(short_io_buf + requested_nob,
1654                                ptr + poff,
1655                                pg->count);
1656                         kunmap_atomic(ptr);
1657                 } else if (short_io_size == 0) {
1658                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1659                                                          pg->count);
1660                 }
1661                 requested_nob += pg->count;
1662
1663                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1664                         niobuf--;
1665                         niobuf->rnb_len += pg->count;
1666                 } else {
1667                         niobuf->rnb_offset = pg->off;
1668                         niobuf->rnb_len    = pg->count;
1669                         niobuf->rnb_flags  = pg->flag;
1670                 }
1671                 pg_prev = pg;
1672         }
1673
1674         LASSERTF((void *)(niobuf - niocount) ==
1675                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1676                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1677                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1678
1679         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1680         if (resend) {
1681                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1682                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1683                         body->oa.o_flags = 0;
1684                 }
1685                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1686         }
1687
1688         if (osc_should_shrink_grant(cli))
1689                 osc_shrink_grant_local(cli, &body->oa);
1690
1691         /* size[REQ_REC_OFF] still sizeof (*body) */
1692         if (opc == OST_WRITE) {
1693                 if (cli->cl_checksum &&
1694                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1695                         /* store cl_cksum_type in a local variable since
1696                          * it can be changed via lprocfs */
1697                         enum cksum_types cksum_type = cli->cl_cksum_type;
1698
1699                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1700                                 body->oa.o_flags = 0;
1701
1702                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1703                                                                 cksum_type);
1704                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1705
1706                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1707                                                   requested_nob, page_count,
1708                                                   pga, OST_WRITE,
1709                                                   &body->oa.o_cksum);
1710                         if (rc < 0) {
1711                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1712                                        rc);
1713                                 GOTO(out, rc);
1714                         }
1715                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1716                                body->oa.o_cksum);
1717
1718                         /* save this in 'oa', too, for later checking */
1719                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1720                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1721                                                            cksum_type);
1722                 } else {
1723                         /* clear out the checksum flag, in case this is a
1724                          * resend but cl_checksum is no longer set. b=11238 */
1725                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1726                 }
1727                 oa->o_cksum = body->oa.o_cksum;
1728                 /* 1 RC per niobuf */
1729                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1730                                      sizeof(__u32) * niocount);
1731         } else {
1732                 if (cli->cl_checksum &&
1733                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1734                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1735                                 body->oa.o_flags = 0;
1736                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1737                                 cli->cl_cksum_type);
1738                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1739                 }
1740
1741                 /* Client cksum has been already copied to wire obdo in previous
1742                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1743                  * resent due to cksum error, this will allow Server to
1744                  * check+dump pages on its side */
1745         }
1746         ptlrpc_request_set_replen(req);
1747
1748         aa = ptlrpc_req_async_args(aa, req);
1749         aa->aa_oa = oa;
1750         aa->aa_requested_nob = requested_nob;
1751         aa->aa_nio_count = niocount;
1752         aa->aa_page_count = page_count;
1753         aa->aa_resends = 0;
1754         aa->aa_ppga = pga;
1755         aa->aa_cli = cli;
1756         INIT_LIST_HEAD(&aa->aa_oaps);
1757
1758         *reqp = req;
1759         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1760         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1761                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1762                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1763         RETURN(0);
1764
1765  out:
1766         ptlrpc_req_finished(req);
1767         RETURN(rc);
1768 }
1769
1770 char dbgcksum_file_name[PATH_MAX];
1771
1772 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1773                                 struct brw_page **pga, __u32 server_cksum,
1774                                 __u32 client_cksum)
1775 {
1776         struct file *filp;
1777         int rc, i;
1778         unsigned int len;
1779         char *buf;
1780
1781         /* will only keep dump of pages on first error for the same range in
1782          * file/fid, not during the resends/retries. */
1783         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1784                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1785                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1786                   libcfs_debug_file_path_arr :
1787                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1788                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1789                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1790                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1791                  pga[0]->off,
1792                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1793                  client_cksum, server_cksum);
1794         filp = filp_open(dbgcksum_file_name,
1795                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1796         if (IS_ERR(filp)) {
1797                 rc = PTR_ERR(filp);
1798                 if (rc == -EEXIST)
1799                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1800                                "checksum error: rc = %d\n", dbgcksum_file_name,
1801                                rc);
1802                 else
1803                         CERROR("%s: can't open to dump pages with checksum "
1804                                "error: rc = %d\n", dbgcksum_file_name, rc);
1805                 return;
1806         }
1807
1808         for (i = 0; i < page_count; i++) {
1809                 len = pga[i]->count;
1810                 buf = kmap(pga[i]->pg);
1811                 while (len != 0) {
1812                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1813                         if (rc < 0) {
1814                                 CERROR("%s: wanted to write %u but got %d "
1815                                        "error\n", dbgcksum_file_name, len, rc);
1816                                 break;
1817                         }
1818                         len -= rc;
1819                         buf += rc;
1820                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1821                                dbgcksum_file_name, rc);
1822                 }
1823                 kunmap(pga[i]->pg);
1824         }
1825
1826         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1827         if (rc)
1828                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1829         filp_close(filp, NULL);
1830 }
1831
1832 static int
1833 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1834                      __u32 client_cksum, __u32 server_cksum,
1835                      struct osc_brw_async_args *aa)
1836 {
1837         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1838         enum cksum_types cksum_type;
1839         obd_dif_csum_fn *fn = NULL;
1840         int sector_size = 0;
1841         __u32 new_cksum;
1842         char *msg;
1843         int rc;
1844
1845         if (server_cksum == client_cksum) {
1846                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1847                 return 0;
1848         }
1849
1850         if (aa->aa_cli->cl_checksum_dump)
1851                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1852                                     server_cksum, client_cksum);
1853
1854         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1855                                            oa->o_flags : 0);
1856
1857         switch (cksum_type) {
1858         case OBD_CKSUM_T10IP512:
1859                 fn = obd_dif_ip_fn;
1860                 sector_size = 512;
1861                 break;
1862         case OBD_CKSUM_T10IP4K:
1863                 fn = obd_dif_ip_fn;
1864                 sector_size = 4096;
1865                 break;
1866         case OBD_CKSUM_T10CRC512:
1867                 fn = obd_dif_crc_fn;
1868                 sector_size = 512;
1869                 break;
1870         case OBD_CKSUM_T10CRC4K:
1871                 fn = obd_dif_crc_fn;
1872                 sector_size = 4096;
1873                 break;
1874         default:
1875                 break;
1876         }
1877
1878         if (fn)
1879                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1880                                              aa->aa_page_count, aa->aa_ppga,
1881                                              OST_WRITE, fn, sector_size,
1882                                              &new_cksum);
1883         else
1884                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1885                                        aa->aa_ppga, OST_WRITE, cksum_type,
1886                                        &new_cksum);
1887
1888         if (rc < 0)
1889                 msg = "failed to calculate the client write checksum";
1890         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1891                 msg = "the server did not use the checksum type specified in "
1892                       "the original request - likely a protocol problem";
1893         else if (new_cksum == server_cksum)
1894                 msg = "changed on the client after we checksummed it - "
1895                       "likely false positive due to mmap IO (bug 11742)";
1896         else if (new_cksum == client_cksum)
1897                 msg = "changed in transit before arrival at OST";
1898         else
1899                 msg = "changed in transit AND doesn't match the original - "
1900                       "likely false positive due to mmap IO (bug 11742)";
1901
1902         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1903                            DFID " object "DOSTID" extent [%llu-%llu], original "
1904                            "client csum %x (type %x), server csum %x (type %x),"
1905                            " client csum now %x\n",
1906                            obd_name, msg, libcfs_nid2str(peer->nid),
1907                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1908                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1909                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1910                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1911                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1912                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1913                            client_cksum,
1914                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1915                            server_cksum, cksum_type, new_cksum);
1916         return 1;
1917 }
1918
1919 /* Note rc enters this function as number of bytes transferred */
1920 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1921 {
1922         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1923         struct client_obd *cli = aa->aa_cli;
1924         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1925         const struct lnet_process_id *peer =
1926                 &req->rq_import->imp_connection->c_peer;
1927         struct ost_body *body;
1928         u32 client_cksum = 0;
1929         struct inode *inode;
1930         unsigned int blockbits = 0, blocksize = 0;
1931
1932         ENTRY;
1933
1934         if (rc < 0 && rc != -EDQUOT) {
1935                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1936                 RETURN(rc);
1937         }
1938
1939         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1940         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1941         if (body == NULL) {
1942                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1943                 RETURN(-EPROTO);
1944         }
1945
1946         /* set/clear over quota flag for a uid/gid/projid */
1947         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1948             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1949                 unsigned qid[LL_MAXQUOTAS] = {
1950                                          body->oa.o_uid, body->oa.o_gid,
1951                                          body->oa.o_projid };
1952                 CDEBUG(D_QUOTA,
1953                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1954                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1955                        body->oa.o_valid, body->oa.o_flags);
1956                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1957                                        body->oa.o_flags);
1958         }
1959
1960         osc_update_grant(cli, body);
1961
1962         if (rc < 0)
1963                 RETURN(rc);
1964
1965         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1966                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1967
1968         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1969                 if (rc > 0) {
1970                         CERROR("%s: unexpected positive size %d\n",
1971                                obd_name, rc);
1972                         RETURN(-EPROTO);
1973                 }
1974
1975                 if (req->rq_bulk != NULL &&
1976                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1977                         RETURN(-EAGAIN);
1978
1979                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1980                     check_write_checksum(&body->oa, peer, client_cksum,
1981                                          body->oa.o_cksum, aa))
1982                         RETURN(-EAGAIN);
1983
1984                 rc = check_write_rcs(req, aa->aa_requested_nob,
1985                                      aa->aa_nio_count, aa->aa_page_count,
1986                                      aa->aa_ppga);
1987                 GOTO(out, rc);
1988         }
1989
1990         /* The rest of this function executes only for OST_READs */
1991
1992         if (req->rq_bulk == NULL) {
1993                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1994                                           RCL_SERVER);
1995                 LASSERT(rc == req->rq_status);
1996         } else {
1997                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1998                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1999         }
2000         if (rc < 0)
2001                 GOTO(out, rc = -EAGAIN);
2002
2003         if (rc > aa->aa_requested_nob) {
2004                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2005                        rc, aa->aa_requested_nob);
2006                 RETURN(-EPROTO);
2007         }
2008
2009         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2010                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2011                        rc, req->rq_bulk->bd_nob_transferred);
2012                 RETURN(-EPROTO);
2013         }
2014
2015         if (req->rq_bulk == NULL) {
2016                 /* short io */
2017                 int nob, pg_count, i = 0;
2018                 unsigned char *buf;
2019
2020                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2021                 pg_count = aa->aa_page_count;
2022                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2023                                                    rc);
2024                 nob = rc;
2025                 while (nob > 0 && pg_count > 0) {
2026                         unsigned char *ptr;
2027                         int count = aa->aa_ppga[i]->count > nob ?
2028                                     nob : aa->aa_ppga[i]->count;
2029
2030                         CDEBUG(D_CACHE, "page %p count %d\n",
2031                                aa->aa_ppga[i]->pg, count);
2032                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2033                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2034                                count);
2035                         kunmap_atomic((void *) ptr);
2036
2037                         buf += count;
2038                         nob -= count;
2039                         i++;
2040                         pg_count--;
2041                 }
2042         }
2043
2044         if (rc < aa->aa_requested_nob)
2045                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2046
2047         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2048                 static int cksum_counter;
2049                 u32        server_cksum = body->oa.o_cksum;
2050                 char      *via = "";
2051                 char      *router = "";
2052                 enum cksum_types cksum_type;
2053                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2054                         body->oa.o_flags : 0;
2055
2056                 cksum_type = obd_cksum_type_unpack(o_flags);
2057                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2058                                           aa->aa_page_count, aa->aa_ppga,
2059                                           OST_READ, &client_cksum);
2060                 if (rc < 0)
2061                         GOTO(out, rc);
2062
2063                 if (req->rq_bulk != NULL &&
2064                     peer->nid != req->rq_bulk->bd_sender) {
2065                         via = " via ";
2066                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2067                 }
2068
2069                 if (server_cksum != client_cksum) {
2070                         struct ost_body *clbody;
2071                         u32 page_count = aa->aa_page_count;
2072
2073                         clbody = req_capsule_client_get(&req->rq_pill,
2074                                                         &RMF_OST_BODY);
2075                         if (cli->cl_checksum_dump)
2076                                 dump_all_bulk_pages(&clbody->oa, page_count,
2077                                                     aa->aa_ppga, server_cksum,
2078                                                     client_cksum);
2079
2080                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2081                                            "%s%s%s inode "DFID" object "DOSTID
2082                                            " extent [%llu-%llu], client %x, "
2083                                            "server %x, cksum_type %x\n",
2084                                            obd_name,
2085                                            libcfs_nid2str(peer->nid),
2086                                            via, router,
2087                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2088                                                 clbody->oa.o_parent_seq : 0ULL,
2089                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2090                                                 clbody->oa.o_parent_oid : 0,
2091                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2092                                                 clbody->oa.o_parent_ver : 0,
2093                                            POSTID(&body->oa.o_oi),
2094                                            aa->aa_ppga[0]->off,
2095                                            aa->aa_ppga[page_count-1]->off +
2096                                            aa->aa_ppga[page_count-1]->count - 1,
2097                                            client_cksum, server_cksum,
2098                                            cksum_type);
2099                         cksum_counter = 0;
2100                         aa->aa_oa->o_cksum = client_cksum;
2101                         rc = -EAGAIN;
2102                 } else {
2103                         cksum_counter++;
2104                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2105                         rc = 0;
2106                 }
2107         } else if (unlikely(client_cksum)) {
2108                 static int cksum_missed;
2109
2110                 cksum_missed++;
2111                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2112                         CERROR("%s: checksum %u requested from %s but not sent\n",
2113                                obd_name, cksum_missed,
2114                                libcfs_nid2str(peer->nid));
2115         } else {
2116                 rc = 0;
2117         }
2118
2119         inode = page2inode(aa->aa_ppga[0]->pg);
2120         if (inode == NULL) {
2121                 /* Try to get reference to inode from cl_page if we are
2122                  * dealing with direct IO, as handled pages are not
2123                  * actual page cache pages.
2124                  */
2125                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2126
2127                 inode = oap2cl_page(oap)->cp_inode;
2128                 if (inode) {
2129                         blockbits = inode->i_blkbits;
2130                         blocksize = 1 << blockbits;
2131                 }
2132         }
2133         if (inode && IS_ENCRYPTED(inode)) {
2134                 int idx;
2135
2136                 if (!llcrypt_has_encryption_key(inode)) {
2137                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2138                         GOTO(out, rc);
2139                 }
2140                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2141                         struct brw_page *pg = aa->aa_ppga[idx];
2142                         unsigned int offs = 0;
2143
2144                         while (offs < PAGE_SIZE) {
2145                                 /* do not decrypt if page is all 0s */
2146                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2147                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2148                                         /* if page is empty forward info to
2149                                          * upper layers (ll_io_zero_page) by
2150                                          * clearing PagePrivate2
2151                                          */
2152                                         if (!offs)
2153                                                 ClearPagePrivate2(pg->pg);
2154                                         break;
2155                                 }
2156
2157                                 if (blockbits) {
2158                                         /* This is direct IO case. Directly call
2159                                          * decrypt function that takes inode as
2160                                          * input parameter. Page does not need
2161                                          * to be locked.
2162                                          */
2163                                         u64 lblk_num =
2164                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2165                                                      (PAGE_SHIFT - blockbits)) +
2166                                                        (offs >> blockbits);
2167                                         unsigned int i;
2168
2169                                         for (i = offs;
2170                                              i < offs +
2171                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2172                                              i += blocksize, lblk_num++) {
2173                                                 rc =
2174                                                   llcrypt_decrypt_block_inplace(
2175                                                           inode, pg->pg,
2176                                                           blocksize, i,
2177                                                           lblk_num);
2178                                                 if (rc)
2179                                                         break;
2180                                         }
2181                                 } else {
2182                                         rc = llcrypt_decrypt_pagecache_blocks(
2183                                                 pg->pg,
2184                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2185                                                 offs);
2186                                 }
2187                                 if (rc)
2188                                         GOTO(out, rc);
2189
2190                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2191                         }
2192                 }
2193         }
2194
2195 out:
2196         if (rc >= 0)
2197                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2198                                      aa->aa_oa, &body->oa);
2199
2200         RETURN(rc);
2201 }
2202
2203 static int osc_brw_redo_request(struct ptlrpc_request *request,
2204                                 struct osc_brw_async_args *aa, int rc)
2205 {
2206         struct ptlrpc_request *new_req;
2207         struct osc_brw_async_args *new_aa;
2208         struct osc_async_page *oap;
2209         ENTRY;
2210
2211         /* The below message is checked in replay-ost-single.sh test_8ae*/
2212         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2213                   "redo for recoverable error %d", rc);
2214
2215         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2216                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2217                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2218                                   aa->aa_ppga, &new_req, 1);
2219         if (rc)
2220                 RETURN(rc);
2221
2222         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2223                 if (oap->oap_request != NULL) {
2224                         LASSERTF(request == oap->oap_request,
2225                                  "request %p != oap_request %p\n",
2226                                  request, oap->oap_request);
2227                 }
2228         }
2229         /*
2230          * New request takes over pga and oaps from old request.
2231          * Note that copying a list_head doesn't work, need to move it...
2232          */
2233         aa->aa_resends++;
2234         new_req->rq_interpret_reply = request->rq_interpret_reply;
2235         new_req->rq_async_args = request->rq_async_args;
2236         new_req->rq_commit_cb = request->rq_commit_cb;
2237         /* cap resend delay to the current request timeout, this is similar to
2238          * what ptlrpc does (see after_reply()) */
2239         if (aa->aa_resends > new_req->rq_timeout)
2240                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2241         else
2242                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2243         new_req->rq_generation_set = 1;
2244         new_req->rq_import_generation = request->rq_import_generation;
2245
2246         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2247
2248         INIT_LIST_HEAD(&new_aa->aa_oaps);
2249         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2250         INIT_LIST_HEAD(&new_aa->aa_exts);
2251         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2252         new_aa->aa_resends = aa->aa_resends;
2253
2254         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2255                 if (oap->oap_request) {
2256                         ptlrpc_req_finished(oap->oap_request);
2257                         oap->oap_request = ptlrpc_request_addref(new_req);
2258                 }
2259         }
2260
2261         /* XXX: This code will run into problem if we're going to support
2262          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2263          * and wait for all of them to be finished. We should inherit request
2264          * set from old request. */
2265         ptlrpcd_add_req(new_req);
2266
2267         DEBUG_REQ(D_INFO, new_req, "new request");
2268         RETURN(0);
2269 }
2270
2271 /*
2272  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2273  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2274  * fine for our small page arrays and doesn't require allocation.  its an
2275  * insertion sort that swaps elements that are strides apart, shrinking the
2276  * stride down until its '1' and the array is sorted.
2277  */
2278 static void sort_brw_pages(struct brw_page **array, int num)
2279 {
2280         int stride, i, j;
2281         struct brw_page *tmp;
2282
2283         if (num == 1)
2284                 return;
2285         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2286                 ;
2287
2288         do {
2289                 stride /= 3;
2290                 for (i = stride ; i < num ; i++) {
2291                         tmp = array[i];
2292                         j = i;
2293                         while (j >= stride && array[j - stride]->off > tmp->off) {
2294                                 array[j] = array[j - stride];
2295                                 j -= stride;
2296                         }
2297                         array[j] = tmp;
2298                 }
2299         } while (stride > 1);
2300 }
2301
2302 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2303 {
2304         LASSERT(ppga != NULL);
2305         OBD_FREE_PTR_ARRAY(ppga, count);
2306 }
2307
2308 static int brw_interpret(const struct lu_env *env,
2309                          struct ptlrpc_request *req, void *args, int rc)
2310 {
2311         struct osc_brw_async_args *aa = args;
2312         struct osc_extent *ext;
2313         struct osc_extent *tmp;
2314         struct client_obd *cli = aa->aa_cli;
2315         unsigned long transferred = 0;
2316
2317         ENTRY;
2318
2319         rc = osc_brw_fini_request(req, rc);
2320         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2321
2322         /* restore clear text pages */
2323         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2324
2325         /*
2326          * When server returns -EINPROGRESS, client should always retry
2327          * regardless of the number of times the bulk was resent already.
2328          */
2329         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2330                 if (req->rq_import_generation !=
2331                     req->rq_import->imp_generation) {
2332                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2333                                ""DOSTID", rc = %d.\n",
2334                                req->rq_import->imp_obd->obd_name,
2335                                POSTID(&aa->aa_oa->o_oi), rc);
2336                 } else if (rc == -EINPROGRESS ||
2337                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2338                         rc = osc_brw_redo_request(req, aa, rc);
2339                 } else {
2340                         CERROR("%s: too many resent retries for object: "
2341                                "%llu:%llu, rc = %d.\n",
2342                                req->rq_import->imp_obd->obd_name,
2343                                POSTID(&aa->aa_oa->o_oi), rc);
2344                 }
2345
2346                 if (rc == 0)
2347                         RETURN(0);
2348                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2349                         rc = -EIO;
2350         }
2351
2352         if (rc == 0) {
2353                 struct obdo *oa = aa->aa_oa;
2354                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2355                 unsigned long valid = 0;
2356                 struct cl_object *obj;
2357                 struct osc_async_page *last;
2358
2359                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2360                 obj = osc2cl(last->oap_obj);
2361
2362                 cl_object_attr_lock(obj);
2363                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2364                         attr->cat_blocks = oa->o_blocks;
2365                         valid |= CAT_BLOCKS;
2366                 }
2367                 if (oa->o_valid & OBD_MD_FLMTIME) {
2368                         attr->cat_mtime = oa->o_mtime;
2369                         valid |= CAT_MTIME;
2370                 }
2371                 if (oa->o_valid & OBD_MD_FLATIME) {
2372                         attr->cat_atime = oa->o_atime;
2373                         valid |= CAT_ATIME;
2374                 }
2375                 if (oa->o_valid & OBD_MD_FLCTIME) {
2376                         attr->cat_ctime = oa->o_ctime;
2377                         valid |= CAT_CTIME;
2378                 }
2379
2380                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2381                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2382                         loff_t last_off = last->oap_count + last->oap_obj_off +
2383                                 last->oap_page_off;
2384
2385                         /* Change file size if this is an out of quota or
2386                          * direct IO write and it extends the file size */
2387                         if (loi->loi_lvb.lvb_size < last_off) {
2388                                 attr->cat_size = last_off;
2389                                 valid |= CAT_SIZE;
2390                         }
2391                         /* Extend KMS if it's not a lockless write */
2392                         if (loi->loi_kms < last_off &&
2393                             oap2osc_page(last)->ops_srvlock == 0) {
2394                                 attr->cat_kms = last_off;
2395                                 valid |= CAT_KMS;
2396                         }
2397                 }
2398
2399                 if (valid != 0)
2400                         cl_object_attr_update(env, obj, attr, valid);
2401                 cl_object_attr_unlock(obj);
2402         }
2403         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2404         aa->aa_oa = NULL;
2405
2406         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2407                 osc_inc_unstable_pages(req);
2408
2409         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2410                 list_del_init(&ext->oe_link);
2411                 osc_extent_finish(env, ext, 1,
2412                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2413         }
2414         LASSERT(list_empty(&aa->aa_exts));
2415         LASSERT(list_empty(&aa->aa_oaps));
2416
2417         transferred = (req->rq_bulk == NULL ? /* short io */
2418                        aa->aa_requested_nob :
2419                        req->rq_bulk->bd_nob_transferred);
2420
2421         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2422         ptlrpc_lprocfs_brw(req, transferred);
2423
2424         spin_lock(&cli->cl_loi_list_lock);
2425         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2426          * is called so we know whether to go to sync BRWs or wait for more
2427          * RPCs to complete */
2428         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2429                 cli->cl_w_in_flight--;
2430         else
2431                 cli->cl_r_in_flight--;
2432         osc_wake_cache_waiters(cli);
2433         spin_unlock(&cli->cl_loi_list_lock);
2434
2435         osc_io_unplug(env, cli, NULL);
2436         RETURN(rc);
2437 }
2438
2439 static void brw_commit(struct ptlrpc_request *req)
2440 {
2441         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2442          * this called via the rq_commit_cb, I need to ensure
2443          * osc_dec_unstable_pages is still called. Otherwise unstable
2444          * pages may be leaked. */
2445         spin_lock(&req->rq_lock);
2446         if (likely(req->rq_unstable)) {
2447                 req->rq_unstable = 0;
2448                 spin_unlock(&req->rq_lock);
2449
2450                 osc_dec_unstable_pages(req);
2451         } else {
2452                 req->rq_committed = 1;
2453                 spin_unlock(&req->rq_lock);
2454         }
2455 }
2456
2457 /**
2458  * Build an RPC by the list of extent @ext_list. The caller must ensure
2459  * that the total pages in this list are NOT over max pages per RPC.
2460  * Extents in the list must be in OES_RPC state.
2461  */
2462 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2463                   struct list_head *ext_list, int cmd)
2464 {
2465         struct ptlrpc_request           *req = NULL;
2466         struct osc_extent               *ext;
2467         struct brw_page                 **pga = NULL;
2468         struct osc_brw_async_args       *aa = NULL;
2469         struct obdo                     *oa = NULL;
2470         struct osc_async_page           *oap;
2471         struct osc_object               *obj = NULL;
2472         struct cl_req_attr              *crattr = NULL;
2473         loff_t                          starting_offset = OBD_OBJECT_EOF;
2474         loff_t                          ending_offset = 0;
2475         /* '1' for consistency with code that checks !mpflag to restore */
2476         int mpflag = 1;
2477         int                             mem_tight = 0;
2478         int                             page_count = 0;
2479         bool                            soft_sync = false;
2480         bool                            ndelay = false;
2481         int                             i;
2482         int                             grant = 0;
2483         int                             rc;
2484         __u32                           layout_version = 0;
2485         LIST_HEAD(rpc_list);
2486         struct ost_body                 *body;
2487         ENTRY;
2488         LASSERT(!list_empty(ext_list));
2489
2490         /* add pages into rpc_list to build BRW rpc */
2491         list_for_each_entry(ext, ext_list, oe_link) {
2492                 LASSERT(ext->oe_state == OES_RPC);
2493                 mem_tight |= ext->oe_memalloc;
2494                 grant += ext->oe_grants;
2495                 page_count += ext->oe_nr_pages;
2496                 layout_version = max(layout_version, ext->oe_layout_version);
2497                 if (obj == NULL)
2498                         obj = ext->oe_obj;
2499         }
2500
2501         soft_sync = osc_over_unstable_soft_limit(cli);
2502         if (mem_tight)
2503                 mpflag = memalloc_noreclaim_save();
2504
2505         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2506         if (pga == NULL)
2507                 GOTO(out, rc = -ENOMEM);
2508
2509         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2510         if (oa == NULL)
2511                 GOTO(out, rc = -ENOMEM);
2512
2513         i = 0;
2514         list_for_each_entry(ext, ext_list, oe_link) {
2515                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2516                         if (mem_tight)
2517                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2518                         if (soft_sync)
2519                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2520                         pga[i] = &oap->oap_brw_page;
2521                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2522                         i++;
2523
2524                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2525                         if (starting_offset == OBD_OBJECT_EOF ||
2526                             starting_offset > oap->oap_obj_off)
2527                                 starting_offset = oap->oap_obj_off;
2528                         else
2529                                 LASSERT(oap->oap_page_off == 0);
2530                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2531                                 ending_offset = oap->oap_obj_off +
2532                                                 oap->oap_count;
2533                         else
2534                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2535                                         PAGE_SIZE);
2536                 }
2537                 if (ext->oe_ndelay)
2538                         ndelay = true;
2539         }
2540
2541         /* first page in the list */
2542         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2543
2544         crattr = &osc_env_info(env)->oti_req_attr;
2545         memset(crattr, 0, sizeof(*crattr));
2546         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2547         crattr->cra_flags = ~0ULL;
2548         crattr->cra_page = oap2cl_page(oap);
2549         crattr->cra_oa = oa;
2550         cl_req_attr_set(env, osc2cl(obj), crattr);
2551
2552         if (cmd == OBD_BRW_WRITE) {
2553                 oa->o_grant_used = grant;
2554                 if (layout_version > 0) {
2555                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2556                                PFID(&oa->o_oi.oi_fid), layout_version);
2557
2558                         oa->o_layout_version = layout_version;
2559                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2560                 }
2561         }
2562
2563         sort_brw_pages(pga, page_count);
2564         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2565         if (rc != 0) {
2566                 CERROR("prep_req failed: %d\n", rc);
2567                 GOTO(out, rc);
2568         }
2569
2570         req->rq_commit_cb = brw_commit;
2571         req->rq_interpret_reply = brw_interpret;
2572         req->rq_memalloc = mem_tight != 0;
2573         oap->oap_request = ptlrpc_request_addref(req);
2574         if (ndelay) {
2575                 req->rq_no_resend = req->rq_no_delay = 1;
2576                 /* probably set a shorter timeout value.
2577                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2578                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2579         }
2580
2581         /* Need to update the timestamps after the request is built in case
2582          * we race with setattr (locally or in queue at OST).  If OST gets
2583          * later setattr before earlier BRW (as determined by the request xid),
2584          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2585          * way to do this in a single call.  bug 10150 */
2586         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2587         crattr->cra_oa = &body->oa;
2588         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2589         cl_req_attr_set(env, osc2cl(obj), crattr);
2590         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2591
2592         aa = ptlrpc_req_async_args(aa, req);
2593         INIT_LIST_HEAD(&aa->aa_oaps);
2594         list_splice_init(&rpc_list, &aa->aa_oaps);
2595         INIT_LIST_HEAD(&aa->aa_exts);
2596         list_splice_init(ext_list, &aa->aa_exts);
2597
2598         spin_lock(&cli->cl_loi_list_lock);
2599         starting_offset >>= PAGE_SHIFT;
2600         if (cmd == OBD_BRW_READ) {
2601                 cli->cl_r_in_flight++;
2602                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2603                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2604                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2605                                       starting_offset + 1);
2606         } else {
2607                 cli->cl_w_in_flight++;
2608                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2609                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2610                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2611                                       starting_offset + 1);
2612         }
2613         spin_unlock(&cli->cl_loi_list_lock);
2614
2615         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2616                   page_count, aa, cli->cl_r_in_flight,
2617                   cli->cl_w_in_flight);
2618         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2619
2620         ptlrpcd_add_req(req);
2621         rc = 0;
2622         EXIT;
2623
2624 out:
2625         if (mem_tight)
2626                 memalloc_noreclaim_restore(mpflag);
2627
2628         if (rc != 0) {
2629                 LASSERT(req == NULL);
2630
2631                 if (oa)
2632                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2633                 if (pga) {
2634                         osc_release_bounce_pages(pga, page_count);
2635                         osc_release_ppga(pga, page_count);
2636                 }
2637                 /* this should happen rarely and is pretty bad, it makes the
2638                  * pending list not follow the dirty order */
2639                 while (!list_empty(ext_list)) {
2640                         ext = list_entry(ext_list->next, struct osc_extent,
2641                                          oe_link);
2642                         list_del_init(&ext->oe_link);
2643                         osc_extent_finish(env, ext, 0, rc);
2644                 }
2645         }
2646         RETURN(rc);
2647 }
2648
2649 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2650 {
2651         int set = 0;
2652
2653         LASSERT(lock != NULL);
2654
2655         lock_res_and_lock(lock);
2656
2657         if (lock->l_ast_data == NULL)
2658                 lock->l_ast_data = data;
2659         if (lock->l_ast_data == data)
2660                 set = 1;
2661
2662         unlock_res_and_lock(lock);
2663
2664         return set;
2665 }
2666
2667 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2668                      void *cookie, struct lustre_handle *lockh,
2669                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2670                      int errcode)
2671 {
2672         bool intent = *flags & LDLM_FL_HAS_INTENT;
2673         int rc;
2674         ENTRY;
2675
2676         /* The request was created before ldlm_cli_enqueue call. */
2677         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2678                 struct ldlm_reply *rep;
2679
2680                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2681                 LASSERT(rep != NULL);
2682
2683                 rep->lock_policy_res1 =
2684                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2685                 if (rep->lock_policy_res1)
2686                         errcode = rep->lock_policy_res1;
2687                 if (!speculative)
2688                         *flags |= LDLM_FL_LVB_READY;
2689         } else if (errcode == ELDLM_OK) {
2690                 *flags |= LDLM_FL_LVB_READY;
2691         }
2692
2693         /* Call the update callback. */
2694         rc = (*upcall)(cookie, lockh, errcode);
2695
2696         /* release the reference taken in ldlm_cli_enqueue() */
2697         if (errcode == ELDLM_LOCK_MATCHED)
2698                 errcode = ELDLM_OK;
2699         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2700                 ldlm_lock_decref(lockh, mode);
2701
2702         RETURN(rc);
2703 }
2704
2705 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2706                           void *args, int rc)
2707 {
2708         struct osc_enqueue_args *aa = args;
2709         struct ldlm_lock *lock;
2710         struct lustre_handle *lockh = &aa->oa_lockh;
2711         enum ldlm_mode mode = aa->oa_mode;
2712         struct ost_lvb *lvb = aa->oa_lvb;
2713         __u32 lvb_len = sizeof(*lvb);
2714         __u64 flags = 0;
2715         struct ldlm_enqueue_info einfo = {
2716                 .ei_type = aa->oa_type,
2717                 .ei_mode = mode,
2718         };
2719
2720         ENTRY;
2721
2722         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2723          * be valid. */
2724         lock = ldlm_handle2lock(lockh);
2725         LASSERTF(lock != NULL,
2726                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2727                  lockh->cookie, req, aa);
2728
2729         /* Take an additional reference so that a blocking AST that
2730          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2731          * to arrive after an upcall has been executed by
2732          * osc_enqueue_fini(). */
2733         ldlm_lock_addref(lockh, mode);
2734
2735         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2736         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2737
2738         /* Let CP AST to grant the lock first. */
2739         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2740
2741         if (aa->oa_speculative) {
2742                 LASSERT(aa->oa_lvb == NULL);
2743                 LASSERT(aa->oa_flags == NULL);
2744                 aa->oa_flags = &flags;
2745         }
2746
2747         /* Complete obtaining the lock procedure. */
2748         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2749                                    lvb, lvb_len, lockh, rc);
2750         /* Complete osc stuff. */
2751         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2752                               aa->oa_flags, aa->oa_speculative, rc);
2753
2754         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2755
2756         ldlm_lock_decref(lockh, mode);
2757         LDLM_LOCK_PUT(lock);
2758         RETURN(rc);
2759 }
2760
2761 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2762  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2763  * other synchronous requests, however keeping some locks and trying to obtain
2764  * others may take a considerable amount of time in a case of ost failure; and
2765  * when other sync requests do not get released lock from a client, the client
2766  * is evicted from the cluster -- such scenarious make the life difficult, so
2767  * release locks just after they are obtained. */
2768 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2769                      __u64 *flags, union ldlm_policy_data *policy,
2770                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2771                      void *cookie, struct ldlm_enqueue_info *einfo,
2772                      struct ptlrpc_request_set *rqset, int async,
2773                      bool speculative)
2774 {
2775         struct obd_device *obd = exp->exp_obd;
2776         struct lustre_handle lockh = { 0 };
2777         struct ptlrpc_request *req = NULL;
2778         int intent = *flags & LDLM_FL_HAS_INTENT;
2779         __u64 match_flags = *flags;
2780         enum ldlm_mode mode;
2781         int rc;
2782         ENTRY;
2783
2784         /* Filesystem lock extents are extended to page boundaries so that
2785          * dealing with the page cache is a little smoother.  */
2786         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2787         policy->l_extent.end |= ~PAGE_MASK;
2788
2789         /* Next, search for already existing extent locks that will cover us */
2790         /* If we're trying to read, we also search for an existing PW lock.  The
2791          * VFS and page cache already protect us locally, so lots of readers/
2792          * writers can share a single PW lock.
2793          *
2794          * There are problems with conversion deadlocks, so instead of
2795          * converting a read lock to a write lock, we'll just enqueue a new
2796          * one.
2797          *
2798          * At some point we should cancel the read lock instead of making them
2799          * send us a blocking callback, but there are problems with canceling
2800          * locks out from other users right now, too. */
2801         mode = einfo->ei_mode;
2802         if (einfo->ei_mode == LCK_PR)
2803                 mode |= LCK_PW;
2804         /* Normal lock requests must wait for the LVB to be ready before
2805          * matching a lock; speculative lock requests do not need to,
2806          * because they will not actually use the lock. */
2807         if (!speculative)
2808                 match_flags |= LDLM_FL_LVB_READY;
2809         if (intent != 0)
2810                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2811         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2812                                einfo->ei_type, policy, mode, &lockh);
2813         if (mode) {
2814                 struct ldlm_lock *matched;
2815
2816                 if (*flags & LDLM_FL_TEST_LOCK)
2817                         RETURN(ELDLM_OK);
2818
2819                 matched = ldlm_handle2lock(&lockh);
2820                 if (speculative) {
2821                         /* This DLM lock request is speculative, and does not
2822                          * have an associated IO request. Therefore if there
2823                          * is already a DLM lock, it wll just inform the
2824                          * caller to cancel the request for this stripe.*/
2825                         lock_res_and_lock(matched);
2826                         if (ldlm_extent_equal(&policy->l_extent,
2827                             &matched->l_policy_data.l_extent))
2828                                 rc = -EEXIST;
2829                         else
2830                                 rc = -ECANCELED;
2831                         unlock_res_and_lock(matched);
2832
2833                         ldlm_lock_decref(&lockh, mode);
2834                         LDLM_LOCK_PUT(matched);
2835                         RETURN(rc);
2836                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2837                         *flags |= LDLM_FL_LVB_READY;
2838
2839                         /* We already have a lock, and it's referenced. */
2840                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2841
2842                         ldlm_lock_decref(&lockh, mode);
2843                         LDLM_LOCK_PUT(matched);
2844                         RETURN(ELDLM_OK);
2845                 } else {
2846                         ldlm_lock_decref(&lockh, mode);
2847                         LDLM_LOCK_PUT(matched);
2848                 }
2849         }
2850
2851         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2852                 RETURN(-ENOLCK);
2853
2854         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2855         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2856
2857         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2858                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2859         if (async) {
2860                 if (!rc) {
2861                         struct osc_enqueue_args *aa;
2862                         aa = ptlrpc_req_async_args(aa, req);
2863                         aa->oa_exp         = exp;
2864                         aa->oa_mode        = einfo->ei_mode;
2865                         aa->oa_type        = einfo->ei_type;
2866                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2867                         aa->oa_upcall      = upcall;
2868                         aa->oa_cookie      = cookie;
2869                         aa->oa_speculative = speculative;
2870                         if (!speculative) {
2871                                 aa->oa_flags  = flags;
2872                                 aa->oa_lvb    = lvb;
2873                         } else {
2874                                 /* speculative locks are essentially to enqueue
2875                                  * a DLM lock  in advance, so we don't care
2876                                  * about the result of the enqueue. */
2877                                 aa->oa_lvb    = NULL;
2878                                 aa->oa_flags  = NULL;
2879                         }
2880
2881                         req->rq_interpret_reply = osc_enqueue_interpret;
2882                         ptlrpc_set_add_req(rqset, req);
2883                 }
2884                 RETURN(rc);
2885         }
2886
2887         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2888                               flags, speculative, rc);
2889
2890         RETURN(rc);
2891 }
2892
2893 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2894                    struct ldlm_res_id *res_id, enum ldlm_type type,
2895                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2896                    __u64 *flags, struct osc_object *obj,
2897                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2898 {
2899         struct obd_device *obd = exp->exp_obd;
2900         __u64 lflags = *flags;
2901         enum ldlm_mode rc;
2902         ENTRY;
2903
2904         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2905                 RETURN(-EIO);
2906
2907         /* Filesystem lock extents are extended to page boundaries so that
2908          * dealing with the page cache is a little smoother */
2909         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2910         policy->l_extent.end |= ~PAGE_MASK;
2911
2912         /* Next, search for already existing extent locks that will cover us */
2913         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2914                                         res_id, type, policy, mode, lockh,
2915                                         match_flags);
2916         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2917                 RETURN(rc);
2918
2919         if (obj != NULL) {
2920                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2921
2922                 LASSERT(lock != NULL);
2923                 if (osc_set_lock_data(lock, obj)) {
2924                         lock_res_and_lock(lock);
2925                         if (!ldlm_is_lvb_cached(lock)) {
2926                                 LASSERT(lock->l_ast_data == obj);
2927                                 osc_lock_lvb_update(env, obj, lock, NULL);
2928                                 ldlm_set_lvb_cached(lock);
2929                         }
2930                         unlock_res_and_lock(lock);
2931                 } else {
2932                         ldlm_lock_decref(lockh, rc);
2933                         rc = 0;
2934                 }
2935                 LDLM_LOCK_PUT(lock);
2936         }
2937         RETURN(rc);
2938 }
2939
2940 static int osc_statfs_interpret(const struct lu_env *env,
2941                                 struct ptlrpc_request *req, void *args, int rc)
2942 {
2943         struct osc_async_args *aa = args;
2944         struct obd_statfs *msfs;
2945
2946         ENTRY;
2947         if (rc == -EBADR)
2948                 /*
2949                  * The request has in fact never been sent due to issues at
2950                  * a higher level (LOV).  Exit immediately since the caller
2951                  * is aware of the problem and takes care of the clean up.
2952                  */
2953                 RETURN(rc);
2954
2955         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2956             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2957                 GOTO(out, rc = 0);
2958
2959         if (rc != 0)
2960                 GOTO(out, rc);
2961
2962         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2963         if (msfs == NULL)
2964                 GOTO(out, rc = -EPROTO);
2965
2966         *aa->aa_oi->oi_osfs = *msfs;
2967 out:
2968         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2969
2970         RETURN(rc);
2971 }
2972
2973 static int osc_statfs_async(struct obd_export *exp,
2974                             struct obd_info *oinfo, time64_t max_age,
2975                             struct ptlrpc_request_set *rqset)
2976 {
2977         struct obd_device     *obd = class_exp2obd(exp);
2978         struct ptlrpc_request *req;
2979         struct osc_async_args *aa;
2980         int rc;
2981         ENTRY;
2982
2983         if (obd->obd_osfs_age >= max_age) {
2984                 CDEBUG(D_SUPER,
2985                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2986                        obd->obd_name, &obd->obd_osfs,
2987                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2988                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2989                 spin_lock(&obd->obd_osfs_lock);
2990                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2991                 spin_unlock(&obd->obd_osfs_lock);
2992                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2993                 if (oinfo->oi_cb_up)
2994                         oinfo->oi_cb_up(oinfo, 0);
2995
2996                 RETURN(0);
2997         }
2998
2999         /* We could possibly pass max_age in the request (as an absolute
3000          * timestamp or a "seconds.usec ago") so the target can avoid doing
3001          * extra calls into the filesystem if that isn't necessary (e.g.
3002          * during mount that would help a bit).  Having relative timestamps
3003          * is not so great if request processing is slow, while absolute
3004          * timestamps are not ideal because they need time synchronization. */
3005         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3006         if (req == NULL)
3007                 RETURN(-ENOMEM);
3008
3009         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3010         if (rc) {
3011                 ptlrpc_request_free(req);
3012                 RETURN(rc);
3013         }
3014         ptlrpc_request_set_replen(req);
3015         req->rq_request_portal = OST_CREATE_PORTAL;
3016         ptlrpc_at_set_req_timeout(req);
3017
3018         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3019                 /* procfs requests not want stat in wait for avoid deadlock */
3020                 req->rq_no_resend = 1;
3021                 req->rq_no_delay = 1;
3022         }
3023
3024         req->rq_interpret_reply = osc_statfs_interpret;
3025         aa = ptlrpc_req_async_args(aa, req);
3026         aa->aa_oi = oinfo;
3027
3028         ptlrpc_set_add_req(rqset, req);
3029         RETURN(0);
3030 }
3031
3032 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3033                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3034 {
3035         struct obd_device     *obd = class_exp2obd(exp);
3036         struct obd_statfs     *msfs;
3037         struct ptlrpc_request *req;
3038         struct obd_import     *imp = NULL;
3039         int rc;
3040         ENTRY;
3041
3042
3043         /*Since the request might also come from lprocfs, so we need
3044          *sync this with client_disconnect_export Bug15684*/
3045         down_read(&obd->u.cli.cl_sem);
3046         if (obd->u.cli.cl_import)
3047                 imp = class_import_get(obd->u.cli.cl_import);
3048         up_read(&obd->u.cli.cl_sem);
3049         if (!imp)
3050                 RETURN(-ENODEV);
3051
3052         /* We could possibly pass max_age in the request (as an absolute
3053          * timestamp or a "seconds.usec ago") so the target can avoid doing
3054          * extra calls into the filesystem if that isn't necessary (e.g.
3055          * during mount that would help a bit).  Having relative timestamps
3056          * is not so great if request processing is slow, while absolute
3057          * timestamps are not ideal because they need time synchronization. */
3058         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3059
3060         class_import_put(imp);
3061
3062         if (req == NULL)
3063                 RETURN(-ENOMEM);
3064
3065         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3066         if (rc) {
3067                 ptlrpc_request_free(req);
3068                 RETURN(rc);
3069         }
3070         ptlrpc_request_set_replen(req);
3071         req->rq_request_portal = OST_CREATE_PORTAL;
3072         ptlrpc_at_set_req_timeout(req);
3073
3074         if (flags & OBD_STATFS_NODELAY) {
3075                 /* procfs requests not want stat in wait for avoid deadlock */
3076                 req->rq_no_resend = 1;
3077                 req->rq_no_delay = 1;
3078         }
3079
3080         rc = ptlrpc_queue_wait(req);
3081         if (rc)
3082                 GOTO(out, rc);
3083
3084         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3085         if (msfs == NULL)
3086                 GOTO(out, rc = -EPROTO);
3087
3088         *osfs = *msfs;
3089
3090         EXIT;
3091 out:
3092         ptlrpc_req_finished(req);
3093         return rc;
3094 }
3095
3096 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3097                          void *karg, void __user *uarg)
3098 {
3099         struct obd_device *obd = exp->exp_obd;
3100         struct obd_ioctl_data *data = karg;
3101         int rc = 0;
3102
3103         ENTRY;
3104         if (!try_module_get(THIS_MODULE)) {
3105                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3106                        module_name(THIS_MODULE));
3107                 return -EINVAL;
3108         }
3109         switch (cmd) {
3110         case OBD_IOC_CLIENT_RECOVER:
3111                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3112                                            data->ioc_inlbuf1, 0);
3113                 if (rc > 0)
3114                         rc = 0;
3115                 break;