Whamcloud - gitweb
LU-12275 sec: O_DIRECT for encrypted file
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 cli->cl_avail_grant -= cli->cl_reserved_grant;
1027                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1028                         cli->cl_avail_grant -= cli->cl_dirty_grant;
1029                 else
1030                         cli->cl_avail_grant -=
1031                                         cli->cl_dirty_pages << PAGE_SHIFT;
1032         }
1033
1034         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1035                 u64 size;
1036                 int chunk_mask;
1037
1038                 /* overhead for each extent insertion */
1039                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1040                 /* determine the appropriate chunk size used by osc_extent. */
1041                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1042                                           ocd->ocd_grant_blkbits);
1043                 /* max_pages_per_rpc must be chunk aligned */
1044                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1045                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1046                                              ~chunk_mask) & chunk_mask;
1047                 /* determine maximum extent size, in #pages */
1048                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1049                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1050                 if (cli->cl_max_extent_pages == 0)
1051                         cli->cl_max_extent_pages = 1;
1052         } else {
1053                 cli->cl_grant_extent_tax = 0;
1054                 cli->cl_chunkbits = PAGE_SHIFT;
1055                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1056         }
1057         spin_unlock(&cli->cl_loi_list_lock);
1058
1059         CDEBUG(D_CACHE,
1060                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1061                cli_name(cli),
1062                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1063                cli->cl_max_extent_pages);
1064
1065         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1066                 osc_add_grant_list(cli);
1067 }
1068 EXPORT_SYMBOL(osc_init_grant);
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, size_t page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            size_t page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0) {
1127                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1128                                i, remote_rcs[i], req);
1129                         return remote_rcs[i];
1130                 }
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138         if (req->rq_bulk != NULL &&
1139             req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1169 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1170                                    size_t pg_count, struct brw_page **pga,
1171                                    int opc, obd_dif_csum_fn *fn,
1172                                    int sector_size,
1173                                    u32 *check_sum)
1174 {
1175         struct ahash_request *req;
1176         /* Used Adler as the default checksum type on top of DIF tags */
1177         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1178         struct page *__page;
1179         unsigned char *buffer;
1180         __u16 *guard_start;
1181         unsigned int bufsize;
1182         int guard_number;
1183         int used_number = 0;
1184         int used;
1185         u32 cksum;
1186         int rc = 0;
1187         int i = 0;
1188
1189         LASSERT(pg_count > 0);
1190
1191         __page = alloc_page(GFP_KERNEL);
1192         if (__page == NULL)
1193                 return -ENOMEM;
1194
1195         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(req)) {
1197                 rc = PTR_ERR(req);
1198                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1199                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1200                 GOTO(out, rc);
1201         }
1202
1203         buffer = kmap(__page);
1204         guard_start = (__u16 *)buffer;
1205         guard_number = PAGE_SIZE / sizeof(*guard_start);
1206         while (nob > 0 && pg_count > 0) {
1207                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (unlikely(i == 0 && opc == OST_READ &&
1212                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1213                         unsigned char *ptr = kmap(pga[i]->pg);
1214                         int off = pga[i]->off & ~PAGE_MASK;
1215
1216                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1217                         kunmap(pga[i]->pg);
1218                 }
1219
1220                 /*
1221                  * The left guard number should be able to hold checksums of a
1222                  * whole page
1223                  */
1224                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1225                                                   pga[i]->off & ~PAGE_MASK,
1226                                                   count,
1227                                                   guard_start + used_number,
1228                                                   guard_number - used_number,
1229                                                   &used, sector_size,
1230                                                   fn);
1231                 if (rc)
1232                         break;
1233
1234                 used_number += used;
1235                 if (used_number == guard_number) {
1236                         cfs_crypto_hash_update_page(req, __page, 0,
1237                                 used_number * sizeof(*guard_start));
1238                         used_number = 0;
1239                 }
1240
1241                 nob -= pga[i]->count;
1242                 pg_count--;
1243                 i++;
1244         }
1245         kunmap(__page);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         if (used_number != 0)
1250                 cfs_crypto_hash_update_page(req, __page, 0,
1251                         used_number * sizeof(*guard_start));
1252
1253         bufsize = sizeof(cksum);
1254         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1255
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         *check_sum = cksum;
1262 out:
1263         __free_page(__page);
1264         return rc;
1265 }
1266 #else /* !CONFIG_CRC_T10DIF */
1267 #define obd_dif_ip_fn NULL
1268 #define obd_dif_crc_fn NULL
1269 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1270         -EOPNOTSUPP
1271 #endif /* CONFIG_CRC_T10DIF */
1272
1273 static int osc_checksum_bulk(int nob, size_t pg_count,
1274                              struct brw_page **pga, int opc,
1275                              enum cksum_types cksum_type,
1276                              u32 *cksum)
1277 {
1278         int                             i = 0;
1279         struct ahash_request           *req;
1280         unsigned int                    bufsize;
1281         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1282
1283         LASSERT(pg_count > 0);
1284
1285         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1286         if (IS_ERR(req)) {
1287                 CERROR("Unable to initialize checksum hash %s\n",
1288                        cfs_crypto_hash_name(cfs_alg));
1289                 return PTR_ERR(req);
1290         }
1291
1292         while (nob > 0 && pg_count > 0) {
1293                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1294
1295                 /* corrupt the data before we compute the checksum, to
1296                  * simulate an OST->client data error */
1297                 if (i == 0 && opc == OST_READ &&
1298                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1299                         unsigned char *ptr = kmap(pga[i]->pg);
1300                         int off = pga[i]->off & ~PAGE_MASK;
1301
1302                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1303                         kunmap(pga[i]->pg);
1304                 }
1305                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1306                                             pga[i]->off & ~PAGE_MASK,
1307                                             count);
1308                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1309                                (int)(pga[i]->off & ~PAGE_MASK));
1310
1311                 nob -= pga[i]->count;
1312                 pg_count--;
1313                 i++;
1314         }
1315
1316         bufsize = sizeof(*cksum);
1317         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1318
1319         /* For sending we only compute the wrong checksum instead
1320          * of corrupting the data so it is still correct on a redo */
1321         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1322                 (*cksum)++;
1323
1324         return 0;
1325 }
1326
1327 static int osc_checksum_bulk_rw(const char *obd_name,
1328                                 enum cksum_types cksum_type,
1329                                 int nob, size_t pg_count,
1330                                 struct brw_page **pga, int opc,
1331                                 u32 *check_sum)
1332 {
1333         obd_dif_csum_fn *fn = NULL;
1334         int sector_size = 0;
1335         int rc;
1336
1337         ENTRY;
1338         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1339
1340         if (fn)
1341                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1342                                              opc, fn, sector_size, check_sum);
1343         else
1344                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1345                                        check_sum);
1346
1347         RETURN(rc);
1348 }
1349
1350 static inline void osc_release_bounce_pages(struct brw_page **pga,
1351                                             u32 page_count)
1352 {
1353 #ifdef HAVE_LUSTRE_CRYPTO
1354         int i;
1355
1356         for (i = 0; i < page_count; i++) {
1357                 if (!pga[i]->pg->mapping)
1358                         /* bounce pages are unmapped */
1359                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1360                 pga[i]->count -= pga[i]->bp_count_diff;
1361                 pga[i]->off += pga[i]->bp_off_diff;
1362         }
1363 #endif
1364 }
1365
1366 static int
1367 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1368                      u32 page_count, struct brw_page **pga,
1369                      struct ptlrpc_request **reqp, int resend)
1370 {
1371         struct ptlrpc_request *req;
1372         struct ptlrpc_bulk_desc *desc;
1373         struct ost_body *body;
1374         struct obd_ioobj *ioobj;
1375         struct niobuf_remote *niobuf;
1376         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1377         struct osc_brw_async_args *aa;
1378         struct req_capsule *pill;
1379         struct brw_page *pg_prev;
1380         void *short_io_buf;
1381         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1382         struct inode *inode;
1383
1384         ENTRY;
1385         inode = page2inode(pga[0]->pg);
1386         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1387                 RETURN(-ENOMEM); /* Recoverable */
1388         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1389                 RETURN(-EINVAL); /* Fatal */
1390
1391         if ((cmd & OBD_BRW_WRITE) != 0) {
1392                 opc = OST_WRITE;
1393                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1394                                                 osc_rq_pool,
1395                                                 &RQF_OST_BRW_WRITE);
1396         } else {
1397                 opc = OST_READ;
1398                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1399         }
1400         if (req == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1404                 for (i = 0; i < page_count; i++) {
1405                         struct brw_page *pg = pga[i];
1406                         struct page *data_page = NULL;
1407                         bool retried = false;
1408                         bool lockedbymyself;
1409
1410 retry_encrypt:
1411                         /* The page can already be locked when we arrive here.
1412                          * This is possible when cl_page_assume/vvp_page_assume
1413                          * is stuck on wait_on_page_writeback with page lock
1414                          * held. In this case there is no risk for the lock to
1415                          * be released while we are doing our encryption
1416                          * processing, because writeback against that page will
1417                          * end in vvp_page_completion_write/cl_page_completion,
1418                          * which means only once the page is fully processed.
1419                          */
1420                         lockedbymyself = trylock_page(pg->pg);
1421                         data_page =
1422                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1423                                                                  PAGE_SIZE, 0,
1424                                                                  GFP_NOFS);
1425                         if (lockedbymyself)
1426                                 unlock_page(pg->pg);
1427                         if (IS_ERR(data_page)) {
1428                                 rc = PTR_ERR(data_page);
1429                                 if (rc == -ENOMEM && !retried) {
1430                                         retried = true;
1431                                         rc = 0;
1432                                         goto retry_encrypt;
1433                                 }
1434                                 ptlrpc_request_free(req);
1435                                 RETURN(rc);
1436                         }
1437                         pg->pg = data_page;
1438                         /* there should be no gap in the middle of page array */
1439                         if (i == page_count - 1) {
1440                                 struct osc_async_page *oap = brw_page2oap(pg);
1441
1442                                 oa->o_size = oap->oap_count +
1443                                         oap->oap_obj_off + oap->oap_page_off;
1444                         }
1445                         /* len is forced to PAGE_SIZE, and poff to 0
1446                          * so store the old, clear text info
1447                          */
1448                         pg->bp_count_diff = PAGE_SIZE - pg->count;
1449                         pg->count = PAGE_SIZE;
1450                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1451                         pg->off = pg->off & PAGE_MASK;
1452                 }
1453         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1454                 for (i = 0; i < page_count; i++) {
1455                         struct brw_page *pg = pga[i];
1456
1457                         /* count/off are forced to cover the whole page so that
1458                          * all encrypted data is stored on the OST, so adjust
1459                          * bp_{count,off}_diff for the size of the clear text.
1460                          */
1461                         pg->bp_count_diff = PAGE_SIZE - pg->count;
1462                         pg->count = PAGE_SIZE;
1463                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1464                         pg->off = pg->off & PAGE_MASK;
1465                 }
1466         }
1467
1468         for (niocount = i = 1; i < page_count; i++) {
1469                 if (!can_merge_pages(pga[i - 1], pga[i]))
1470                         niocount++;
1471         }
1472
1473         pill = &req->rq_pill;
1474         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1475                              sizeof(*ioobj));
1476         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1477                              niocount * sizeof(*niobuf));
1478
1479         for (i = 0; i < page_count; i++) {
1480                 short_io_size += pga[i]->count;
1481                 if (!inode || !IS_ENCRYPTED(inode)) {
1482                         pga[i]->bp_count_diff = 0;
1483                         pga[i]->bp_off_diff = 0;
1484                 }
1485         }
1486
1487         /* Check if read/write is small enough to be a short io. */
1488         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1489             !imp_connect_shortio(cli->cl_import))
1490                 short_io_size = 0;
1491
1492         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1493                              opc == OST_READ ? 0 : short_io_size);
1494         if (opc == OST_READ)
1495                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1496                                      short_io_size);
1497
1498         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1499         if (rc) {
1500                 ptlrpc_request_free(req);
1501                 RETURN(rc);
1502         }
1503         osc_set_io_portal(req);
1504
1505         ptlrpc_at_set_req_timeout(req);
1506         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1507          * retry logic */
1508         req->rq_no_retry_einprogress = 1;
1509
1510         if (short_io_size != 0) {
1511                 desc = NULL;
1512                 short_io_buf = NULL;
1513                 goto no_bulk;
1514         }
1515
1516         desc = ptlrpc_prep_bulk_imp(req, page_count,
1517                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1518                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1519                         PTLRPC_BULK_PUT_SINK),
1520                 OST_BULK_PORTAL,
1521                 &ptlrpc_bulk_kiov_pin_ops);
1522
1523         if (desc == NULL)
1524                 GOTO(out, rc = -ENOMEM);
1525         /* NB request now owns desc and will free it when it gets freed */
1526 no_bulk:
1527         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1528         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1529         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1530         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1531
1532         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1533
1534         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1535          * and from_kgid(), because they are asynchronous. Fortunately, variable
1536          * oa contains valid o_uid and o_gid in these two operations.
1537          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1538          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1539          * other process logic */
1540         body->oa.o_uid = oa->o_uid;
1541         body->oa.o_gid = oa->o_gid;
1542
1543         obdo_to_ioobj(oa, ioobj);
1544         ioobj->ioo_bufcnt = niocount;
1545         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1546          * that might be send for this request.  The actual number is decided
1547          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1548          * "max - 1" for old client compatibility sending "0", and also so the
1549          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1550         if (desc != NULL)
1551                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1552         else /* short io */
1553                 ioobj_max_brw_set(ioobj, 0);
1554
1555         if (short_io_size != 0) {
1556                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1557                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1558                         body->oa.o_flags = 0;
1559                 }
1560                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1561                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1562                        short_io_size);
1563                 if (opc == OST_WRITE) {
1564                         short_io_buf = req_capsule_client_get(pill,
1565                                                               &RMF_SHORT_IO);
1566                         LASSERT(short_io_buf != NULL);
1567                 }
1568         }
1569
1570         LASSERT(page_count > 0);
1571         pg_prev = pga[0];
1572         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1573                 struct brw_page *pg = pga[i];
1574                 int poff = pg->off & ~PAGE_MASK;
1575
1576                 LASSERT(pg->count > 0);
1577                 /* make sure there is no gap in the middle of page array */
1578                 LASSERTF(page_count == 1 ||
1579                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1580                           ergo(i > 0 && i < page_count - 1,
1581                                poff == 0 && pg->count == PAGE_SIZE)   &&
1582                           ergo(i == page_count - 1, poff == 0)),
1583                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1584                          i, page_count, pg, pg->off, pg->count);
1585                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1586                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1587                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1588                          i, page_count,
1589                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1590                          pg_prev->pg, page_private(pg_prev->pg),
1591                          pg_prev->pg->index, pg_prev->off);
1592                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1593                         (pg->flag & OBD_BRW_SRVLOCK));
1594                 if (short_io_size != 0 && opc == OST_WRITE) {
1595                         unsigned char *ptr = kmap_atomic(pg->pg);
1596
1597                         LASSERT(short_io_size >= requested_nob + pg->count);
1598                         memcpy(short_io_buf + requested_nob,
1599                                ptr + poff,
1600                                pg->count);
1601                         kunmap_atomic(ptr);
1602                 } else if (short_io_size == 0) {
1603                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1604                                                          pg->count);
1605                 }
1606                 requested_nob += pg->count;
1607
1608                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1609                         niobuf--;
1610                         niobuf->rnb_len += pg->count;
1611                 } else {
1612                         niobuf->rnb_offset = pg->off;
1613                         niobuf->rnb_len    = pg->count;
1614                         niobuf->rnb_flags  = pg->flag;
1615                 }
1616                 pg_prev = pg;
1617         }
1618
1619         LASSERTF((void *)(niobuf - niocount) ==
1620                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1621                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1622                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1623
1624         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1625         if (resend) {
1626                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1627                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1628                         body->oa.o_flags = 0;
1629                 }
1630                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1631         }
1632
1633         if (osc_should_shrink_grant(cli))
1634                 osc_shrink_grant_local(cli, &body->oa);
1635
1636         /* size[REQ_REC_OFF] still sizeof (*body) */
1637         if (opc == OST_WRITE) {
1638                 if (cli->cl_checksum &&
1639                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1640                         /* store cl_cksum_type in a local variable since
1641                          * it can be changed via lprocfs */
1642                         enum cksum_types cksum_type = cli->cl_cksum_type;
1643
1644                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1645                                 body->oa.o_flags = 0;
1646
1647                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1648                                                                 cksum_type);
1649                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1650
1651                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1652                                                   requested_nob, page_count,
1653                                                   pga, OST_WRITE,
1654                                                   &body->oa.o_cksum);
1655                         if (rc < 0) {
1656                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1657                                        rc);
1658                                 GOTO(out, rc);
1659                         }
1660                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1661                                body->oa.o_cksum);
1662
1663                         /* save this in 'oa', too, for later checking */
1664                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1665                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1666                                                            cksum_type);
1667                 } else {
1668                         /* clear out the checksum flag, in case this is a
1669                          * resend but cl_checksum is no longer set. b=11238 */
1670                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1671                 }
1672                 oa->o_cksum = body->oa.o_cksum;
1673                 /* 1 RC per niobuf */
1674                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1675                                      sizeof(__u32) * niocount);
1676         } else {
1677                 if (cli->cl_checksum &&
1678                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1679                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1680                                 body->oa.o_flags = 0;
1681                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1682                                 cli->cl_cksum_type);
1683                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1684                 }
1685
1686                 /* Client cksum has been already copied to wire obdo in previous
1687                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1688                  * resent due to cksum error, this will allow Server to
1689                  * check+dump pages on its side */
1690         }
1691         ptlrpc_request_set_replen(req);
1692
1693         aa = ptlrpc_req_async_args(aa, req);
1694         aa->aa_oa = oa;
1695         aa->aa_requested_nob = requested_nob;
1696         aa->aa_nio_count = niocount;
1697         aa->aa_page_count = page_count;
1698         aa->aa_resends = 0;
1699         aa->aa_ppga = pga;
1700         aa->aa_cli = cli;
1701         INIT_LIST_HEAD(&aa->aa_oaps);
1702
1703         *reqp = req;
1704         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1705         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1706                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1707                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1708         RETURN(0);
1709
1710  out:
1711         ptlrpc_req_finished(req);
1712         RETURN(rc);
1713 }
1714
1715 char dbgcksum_file_name[PATH_MAX];
1716
1717 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1718                                 struct brw_page **pga, __u32 server_cksum,
1719                                 __u32 client_cksum)
1720 {
1721         struct file *filp;
1722         int rc, i;
1723         unsigned int len;
1724         char *buf;
1725
1726         /* will only keep dump of pages on first error for the same range in
1727          * file/fid, not during the resends/retries. */
1728         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1729                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1730                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1731                   libcfs_debug_file_path_arr :
1732                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1733                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1734                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1735                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1736                  pga[0]->off,
1737                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1738                  client_cksum, server_cksum);
1739         filp = filp_open(dbgcksum_file_name,
1740                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1741         if (IS_ERR(filp)) {
1742                 rc = PTR_ERR(filp);
1743                 if (rc == -EEXIST)
1744                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1745                                "checksum error: rc = %d\n", dbgcksum_file_name,
1746                                rc);
1747                 else
1748                         CERROR("%s: can't open to dump pages with checksum "
1749                                "error: rc = %d\n", dbgcksum_file_name, rc);
1750                 return;
1751         }
1752
1753         for (i = 0; i < page_count; i++) {
1754                 len = pga[i]->count;
1755                 buf = kmap(pga[i]->pg);
1756                 while (len != 0) {
1757                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1758                         if (rc < 0) {
1759                                 CERROR("%s: wanted to write %u but got %d "
1760                                        "error\n", dbgcksum_file_name, len, rc);
1761                                 break;
1762                         }
1763                         len -= rc;
1764                         buf += rc;
1765                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1766                                dbgcksum_file_name, rc);
1767                 }
1768                 kunmap(pga[i]->pg);
1769         }
1770
1771         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1772         if (rc)
1773                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1774         filp_close(filp, NULL);
1775 }
1776
1777 static int
1778 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1779                      __u32 client_cksum, __u32 server_cksum,
1780                      struct osc_brw_async_args *aa)
1781 {
1782         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1783         enum cksum_types cksum_type;
1784         obd_dif_csum_fn *fn = NULL;
1785         int sector_size = 0;
1786         __u32 new_cksum;
1787         char *msg;
1788         int rc;
1789
1790         if (server_cksum == client_cksum) {
1791                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1792                 return 0;
1793         }
1794
1795         if (aa->aa_cli->cl_checksum_dump)
1796                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1797                                     server_cksum, client_cksum);
1798
1799         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1800                                            oa->o_flags : 0);
1801
1802         switch (cksum_type) {
1803         case OBD_CKSUM_T10IP512:
1804                 fn = obd_dif_ip_fn;
1805                 sector_size = 512;
1806                 break;
1807         case OBD_CKSUM_T10IP4K:
1808                 fn = obd_dif_ip_fn;
1809                 sector_size = 4096;
1810                 break;
1811         case OBD_CKSUM_T10CRC512:
1812                 fn = obd_dif_crc_fn;
1813                 sector_size = 512;
1814                 break;
1815         case OBD_CKSUM_T10CRC4K:
1816                 fn = obd_dif_crc_fn;
1817                 sector_size = 4096;
1818                 break;
1819         default:
1820                 break;
1821         }
1822
1823         if (fn)
1824                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1825                                              aa->aa_page_count, aa->aa_ppga,
1826                                              OST_WRITE, fn, sector_size,
1827                                              &new_cksum);
1828         else
1829                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1830                                        aa->aa_ppga, OST_WRITE, cksum_type,
1831                                        &new_cksum);
1832
1833         if (rc < 0)
1834                 msg = "failed to calculate the client write checksum";
1835         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1836                 msg = "the server did not use the checksum type specified in "
1837                       "the original request - likely a protocol problem";
1838         else if (new_cksum == server_cksum)
1839                 msg = "changed on the client after we checksummed it - "
1840                       "likely false positive due to mmap IO (bug 11742)";
1841         else if (new_cksum == client_cksum)
1842                 msg = "changed in transit before arrival at OST";
1843         else
1844                 msg = "changed in transit AND doesn't match the original - "
1845                       "likely false positive due to mmap IO (bug 11742)";
1846
1847         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1848                            DFID " object "DOSTID" extent [%llu-%llu], original "
1849                            "client csum %x (type %x), server csum %x (type %x),"
1850                            " client csum now %x\n",
1851                            obd_name, msg, libcfs_nid2str(peer->nid),
1852                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1853                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1854                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1855                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1856                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1857                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1858                            client_cksum,
1859                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1860                            server_cksum, cksum_type, new_cksum);
1861         return 1;
1862 }
1863
1864 /* Note rc enters this function as number of bytes transferred */
1865 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1866 {
1867         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1868         struct client_obd *cli = aa->aa_cli;
1869         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1870         const struct lnet_process_id *peer =
1871                 &req->rq_import->imp_connection->c_peer;
1872         struct ost_body *body;
1873         u32 client_cksum = 0;
1874         struct inode *inode;
1875
1876         ENTRY;
1877
1878         if (rc < 0 && rc != -EDQUOT) {
1879                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1880                 RETURN(rc);
1881         }
1882
1883         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1884         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1885         if (body == NULL) {
1886                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1887                 RETURN(-EPROTO);
1888         }
1889
1890         /* set/clear over quota flag for a uid/gid/projid */
1891         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1892             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1893                 unsigned qid[LL_MAXQUOTAS] = {
1894                                          body->oa.o_uid, body->oa.o_gid,
1895                                          body->oa.o_projid };
1896                 CDEBUG(D_QUOTA,
1897                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1898                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1899                        body->oa.o_valid, body->oa.o_flags);
1900                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1901                                        body->oa.o_flags);
1902         }
1903
1904         osc_update_grant(cli, body);
1905
1906         if (rc < 0)
1907                 RETURN(rc);
1908
1909         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1910                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1911
1912         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1913                 if (rc > 0) {
1914                         CERROR("%s: unexpected positive size %d\n",
1915                                obd_name, rc);
1916                         RETURN(-EPROTO);
1917                 }
1918
1919                 if (req->rq_bulk != NULL &&
1920                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1921                         RETURN(-EAGAIN);
1922
1923                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1924                     check_write_checksum(&body->oa, peer, client_cksum,
1925                                          body->oa.o_cksum, aa))
1926                         RETURN(-EAGAIN);
1927
1928                 rc = check_write_rcs(req, aa->aa_requested_nob,
1929                                      aa->aa_nio_count, aa->aa_page_count,
1930                                      aa->aa_ppga);
1931                 GOTO(out, rc);
1932         }
1933
1934         /* The rest of this function executes only for OST_READs */
1935
1936         if (req->rq_bulk == NULL) {
1937                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1938                                           RCL_SERVER);
1939                 LASSERT(rc == req->rq_status);
1940         } else {
1941                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1942                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1943         }
1944         if (rc < 0)
1945                 GOTO(out, rc = -EAGAIN);
1946
1947         if (rc > aa->aa_requested_nob) {
1948                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1949                        rc, aa->aa_requested_nob);
1950                 RETURN(-EPROTO);
1951         }
1952
1953         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1954                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1955                        rc, req->rq_bulk->bd_nob_transferred);
1956                 RETURN(-EPROTO);
1957         }
1958
1959         if (req->rq_bulk == NULL) {
1960                 /* short io */
1961                 int nob, pg_count, i = 0;
1962                 unsigned char *buf;
1963
1964                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1965                 pg_count = aa->aa_page_count;
1966                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1967                                                    rc);
1968                 nob = rc;
1969                 while (nob > 0 && pg_count > 0) {
1970                         unsigned char *ptr;
1971                         int count = aa->aa_ppga[i]->count > nob ?
1972                                     nob : aa->aa_ppga[i]->count;
1973
1974                         CDEBUG(D_CACHE, "page %p count %d\n",
1975                                aa->aa_ppga[i]->pg, count);
1976                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1977                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1978                                count);
1979                         kunmap_atomic((void *) ptr);
1980
1981                         buf += count;
1982                         nob -= count;
1983                         i++;
1984                         pg_count--;
1985                 }
1986         }
1987
1988         if (rc < aa->aa_requested_nob)
1989                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1990
1991         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1992                 static int cksum_counter;
1993                 u32        server_cksum = body->oa.o_cksum;
1994                 char      *via = "";
1995                 char      *router = "";
1996                 enum cksum_types cksum_type;
1997                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1998                         body->oa.o_flags : 0;
1999
2000                 cksum_type = obd_cksum_type_unpack(o_flags);
2001                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2002                                           aa->aa_page_count, aa->aa_ppga,
2003                                           OST_READ, &client_cksum);
2004                 if (rc < 0)
2005                         GOTO(out, rc);
2006
2007                 if (req->rq_bulk != NULL &&
2008                     peer->nid != req->rq_bulk->bd_sender) {
2009                         via = " via ";
2010                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2011                 }
2012
2013                 if (server_cksum != client_cksum) {
2014                         struct ost_body *clbody;
2015                         u32 page_count = aa->aa_page_count;
2016
2017                         clbody = req_capsule_client_get(&req->rq_pill,
2018                                                         &RMF_OST_BODY);
2019                         if (cli->cl_checksum_dump)
2020                                 dump_all_bulk_pages(&clbody->oa, page_count,
2021                                                     aa->aa_ppga, server_cksum,
2022                                                     client_cksum);
2023
2024                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2025                                            "%s%s%s inode "DFID" object "DOSTID
2026                                            " extent [%llu-%llu], client %x, "
2027                                            "server %x, cksum_type %x\n",
2028                                            obd_name,
2029                                            libcfs_nid2str(peer->nid),
2030                                            via, router,
2031                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2032                                                 clbody->oa.o_parent_seq : 0ULL,
2033                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2034                                                 clbody->oa.o_parent_oid : 0,
2035                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2036                                                 clbody->oa.o_parent_ver : 0,
2037                                            POSTID(&body->oa.o_oi),
2038                                            aa->aa_ppga[0]->off,
2039                                            aa->aa_ppga[page_count-1]->off +
2040                                            aa->aa_ppga[page_count-1]->count - 1,
2041                                            client_cksum, server_cksum,
2042                                            cksum_type);
2043                         cksum_counter = 0;
2044                         aa->aa_oa->o_cksum = client_cksum;
2045                         rc = -EAGAIN;
2046                 } else {
2047                         cksum_counter++;
2048                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2049                         rc = 0;
2050                 }
2051         } else if (unlikely(client_cksum)) {
2052                 static int cksum_missed;
2053
2054                 cksum_missed++;
2055                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2056                         CERROR("%s: checksum %u requested from %s but not sent\n",
2057                                obd_name, cksum_missed,
2058                                libcfs_nid2str(peer->nid));
2059         } else {
2060                 rc = 0;
2061         }
2062
2063         inode = page2inode(aa->aa_ppga[0]->pg);
2064         if (inode && IS_ENCRYPTED(inode)) {
2065                 int idx;
2066
2067                 if (!llcrypt_has_encryption_key(inode)) {
2068                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2069                         GOTO(out, rc);
2070                 }
2071                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2072                         struct brw_page *pg = aa->aa_ppga[idx];
2073
2074                         /* do not decrypt if page is all 0s */
2075                         if (memchr_inv(page_address(pg->pg), 0,
2076                                        PAGE_SIZE) == NULL) {
2077                                 /* if page is empty forward info to upper layers
2078                                  * (ll_io_zero_page) by clearing PagePrivate2
2079                                  */
2080                                 ClearPagePrivate2(pg->pg);
2081                                 continue;
2082                         }
2083
2084                         /* The page is already locked when we arrive here,
2085                          * except when we deal with a twisted page for
2086                          * specific Direct IO support, in which case
2087                          * PageChecked flag is set on page.
2088                          */
2089                         if (PageChecked(pg->pg))
2090                                 lock_page(pg->pg);
2091                         rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2092                                                               PAGE_SIZE, 0);
2093                         if (PageChecked(pg->pg))
2094                                 unlock_page(pg->pg);
2095                         if (rc)
2096                                 GOTO(out, rc);
2097                 }
2098         }
2099
2100 out:
2101         if (rc >= 0)
2102                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2103                                      aa->aa_oa, &body->oa);
2104
2105         RETURN(rc);
2106 }
2107
2108 static int osc_brw_redo_request(struct ptlrpc_request *request,
2109                                 struct osc_brw_async_args *aa, int rc)
2110 {
2111         struct ptlrpc_request *new_req;
2112         struct osc_brw_async_args *new_aa;
2113         struct osc_async_page *oap;
2114         ENTRY;
2115
2116         /* The below message is checked in replay-ost-single.sh test_8ae*/
2117         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2118                   "redo for recoverable error %d", rc);
2119
2120         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2121                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2122                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2123                                   aa->aa_ppga, &new_req, 1);
2124         if (rc)
2125                 RETURN(rc);
2126
2127         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2128                 if (oap->oap_request != NULL) {
2129                         LASSERTF(request == oap->oap_request,
2130                                  "request %p != oap_request %p\n",
2131                                  request, oap->oap_request);
2132                 }
2133         }
2134         /*
2135          * New request takes over pga and oaps from old request.
2136          * Note that copying a list_head doesn't work, need to move it...
2137          */
2138         aa->aa_resends++;
2139         new_req->rq_interpret_reply = request->rq_interpret_reply;
2140         new_req->rq_async_args = request->rq_async_args;
2141         new_req->rq_commit_cb = request->rq_commit_cb;
2142         /* cap resend delay to the current request timeout, this is similar to
2143          * what ptlrpc does (see after_reply()) */
2144         if (aa->aa_resends > new_req->rq_timeout)
2145                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2146         else
2147                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2148         new_req->rq_generation_set = 1;
2149         new_req->rq_import_generation = request->rq_import_generation;
2150
2151         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2152
2153         INIT_LIST_HEAD(&new_aa->aa_oaps);
2154         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2155         INIT_LIST_HEAD(&new_aa->aa_exts);
2156         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2157         new_aa->aa_resends = aa->aa_resends;
2158
2159         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2160                 if (oap->oap_request) {
2161                         ptlrpc_req_finished(oap->oap_request);
2162                         oap->oap_request = ptlrpc_request_addref(new_req);
2163                 }
2164         }
2165
2166         /* XXX: This code will run into problem if we're going to support
2167          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2168          * and wait for all of them to be finished. We should inherit request
2169          * set from old request. */
2170         ptlrpcd_add_req(new_req);
2171
2172         DEBUG_REQ(D_INFO, new_req, "new request");
2173         RETURN(0);
2174 }
2175
2176 /*
2177  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2178  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2179  * fine for our small page arrays and doesn't require allocation.  its an
2180  * insertion sort that swaps elements that are strides apart, shrinking the
2181  * stride down until its '1' and the array is sorted.
2182  */
2183 static void sort_brw_pages(struct brw_page **array, int num)
2184 {
2185         int stride, i, j;
2186         struct brw_page *tmp;
2187
2188         if (num == 1)
2189                 return;
2190         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2191                 ;
2192
2193         do {
2194                 stride /= 3;
2195                 for (i = stride ; i < num ; i++) {
2196                         tmp = array[i];
2197                         j = i;
2198                         while (j >= stride && array[j - stride]->off > tmp->off) {
2199                                 array[j] = array[j - stride];
2200                                 j -= stride;
2201                         }
2202                         array[j] = tmp;
2203                 }
2204         } while (stride > 1);
2205 }
2206
2207 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2208 {
2209         LASSERT(ppga != NULL);
2210         OBD_FREE_PTR_ARRAY(ppga, count);
2211 }
2212
2213 static int brw_interpret(const struct lu_env *env,
2214                          struct ptlrpc_request *req, void *args, int rc)
2215 {
2216         struct osc_brw_async_args *aa = args;
2217         struct osc_extent *ext;
2218         struct osc_extent *tmp;
2219         struct client_obd *cli = aa->aa_cli;
2220         unsigned long transferred = 0;
2221
2222         ENTRY;
2223
2224         rc = osc_brw_fini_request(req, rc);
2225         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2226
2227         /* restore clear text pages */
2228         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2229
2230         /*
2231          * When server returns -EINPROGRESS, client should always retry
2232          * regardless of the number of times the bulk was resent already.
2233          */
2234         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2235                 if (req->rq_import_generation !=
2236                     req->rq_import->imp_generation) {
2237                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2238                                ""DOSTID", rc = %d.\n",
2239                                req->rq_import->imp_obd->obd_name,
2240                                POSTID(&aa->aa_oa->o_oi), rc);
2241                 } else if (rc == -EINPROGRESS ||
2242                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2243                         rc = osc_brw_redo_request(req, aa, rc);
2244                 } else {
2245                         CERROR("%s: too many resent retries for object: "
2246                                "%llu:%llu, rc = %d.\n",
2247                                req->rq_import->imp_obd->obd_name,
2248                                POSTID(&aa->aa_oa->o_oi), rc);
2249                 }
2250
2251                 if (rc == 0)
2252                         RETURN(0);
2253                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2254                         rc = -EIO;
2255         }
2256
2257         if (rc == 0) {
2258                 struct obdo *oa = aa->aa_oa;
2259                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2260                 unsigned long valid = 0;
2261                 struct cl_object *obj;
2262                 struct osc_async_page *last;
2263
2264                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2265                 obj = osc2cl(last->oap_obj);
2266
2267                 cl_object_attr_lock(obj);
2268                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2269                         attr->cat_blocks = oa->o_blocks;
2270                         valid |= CAT_BLOCKS;
2271                 }
2272                 if (oa->o_valid & OBD_MD_FLMTIME) {
2273                         attr->cat_mtime = oa->o_mtime;
2274                         valid |= CAT_MTIME;
2275                 }
2276                 if (oa->o_valid & OBD_MD_FLATIME) {
2277                         attr->cat_atime = oa->o_atime;
2278                         valid |= CAT_ATIME;
2279                 }
2280                 if (oa->o_valid & OBD_MD_FLCTIME) {
2281                         attr->cat_ctime = oa->o_ctime;
2282                         valid |= CAT_CTIME;
2283                 }
2284
2285                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2286                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2287                         loff_t last_off = last->oap_count + last->oap_obj_off +
2288                                 last->oap_page_off;
2289
2290                         /* Change file size if this is an out of quota or
2291                          * direct IO write and it extends the file size */
2292                         if (loi->loi_lvb.lvb_size < last_off) {
2293                                 attr->cat_size = last_off;
2294                                 valid |= CAT_SIZE;
2295                         }
2296                         /* Extend KMS if it's not a lockless write */
2297                         if (loi->loi_kms < last_off &&
2298                             oap2osc_page(last)->ops_srvlock == 0) {
2299                                 attr->cat_kms = last_off;
2300                                 valid |= CAT_KMS;
2301                         }
2302                 }
2303
2304                 if (valid != 0)
2305                         cl_object_attr_update(env, obj, attr, valid);
2306                 cl_object_attr_unlock(obj);
2307         }
2308         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2309         aa->aa_oa = NULL;
2310
2311         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2312                 osc_inc_unstable_pages(req);
2313
2314         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2315                 list_del_init(&ext->oe_link);
2316                 osc_extent_finish(env, ext, 1,
2317                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2318         }
2319         LASSERT(list_empty(&aa->aa_exts));
2320         LASSERT(list_empty(&aa->aa_oaps));
2321
2322         transferred = (req->rq_bulk == NULL ? /* short io */
2323                        aa->aa_requested_nob :
2324                        req->rq_bulk->bd_nob_transferred);
2325
2326         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2327         ptlrpc_lprocfs_brw(req, transferred);
2328
2329         spin_lock(&cli->cl_loi_list_lock);
2330         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2331          * is called so we know whether to go to sync BRWs or wait for more
2332          * RPCs to complete */
2333         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2334                 cli->cl_w_in_flight--;
2335         else
2336                 cli->cl_r_in_flight--;
2337         osc_wake_cache_waiters(cli);
2338         spin_unlock(&cli->cl_loi_list_lock);
2339
2340         osc_io_unplug(env, cli, NULL);
2341         RETURN(rc);
2342 }
2343
2344 static void brw_commit(struct ptlrpc_request *req)
2345 {
2346         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2347          * this called via the rq_commit_cb, I need to ensure
2348          * osc_dec_unstable_pages is still called. Otherwise unstable
2349          * pages may be leaked. */
2350         spin_lock(&req->rq_lock);
2351         if (likely(req->rq_unstable)) {
2352                 req->rq_unstable = 0;
2353                 spin_unlock(&req->rq_lock);
2354
2355                 osc_dec_unstable_pages(req);
2356         } else {
2357                 req->rq_committed = 1;
2358                 spin_unlock(&req->rq_lock);
2359         }
2360 }
2361
2362 /**
2363  * Build an RPC by the list of extent @ext_list. The caller must ensure
2364  * that the total pages in this list are NOT over max pages per RPC.
2365  * Extents in the list must be in OES_RPC state.
2366  */
2367 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2368                   struct list_head *ext_list, int cmd)
2369 {
2370         struct ptlrpc_request           *req = NULL;
2371         struct osc_extent               *ext;
2372         struct brw_page                 **pga = NULL;
2373         struct osc_brw_async_args       *aa = NULL;
2374         struct obdo                     *oa = NULL;
2375         struct osc_async_page           *oap;
2376         struct osc_object               *obj = NULL;
2377         struct cl_req_attr              *crattr = NULL;
2378         loff_t                          starting_offset = OBD_OBJECT_EOF;
2379         loff_t                          ending_offset = 0;
2380         /* '1' for consistency with code that checks !mpflag to restore */
2381         int mpflag = 1;
2382         int                             mem_tight = 0;
2383         int                             page_count = 0;
2384         bool                            soft_sync = false;
2385         bool                            ndelay = false;
2386         int                             i;
2387         int                             grant = 0;
2388         int                             rc;
2389         __u32                           layout_version = 0;
2390         LIST_HEAD(rpc_list);
2391         struct ost_body                 *body;
2392         ENTRY;
2393         LASSERT(!list_empty(ext_list));
2394
2395         /* add pages into rpc_list to build BRW rpc */
2396         list_for_each_entry(ext, ext_list, oe_link) {
2397                 LASSERT(ext->oe_state == OES_RPC);
2398                 mem_tight |= ext->oe_memalloc;
2399                 grant += ext->oe_grants;
2400                 page_count += ext->oe_nr_pages;
2401                 layout_version = max(layout_version, ext->oe_layout_version);
2402                 if (obj == NULL)
2403                         obj = ext->oe_obj;
2404         }
2405
2406         soft_sync = osc_over_unstable_soft_limit(cli);
2407         if (mem_tight)
2408                 mpflag = memalloc_noreclaim_save();
2409
2410         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2411         if (pga == NULL)
2412                 GOTO(out, rc = -ENOMEM);
2413
2414         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2415         if (oa == NULL)
2416                 GOTO(out, rc = -ENOMEM);
2417
2418         i = 0;
2419         list_for_each_entry(ext, ext_list, oe_link) {
2420                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2421                         if (mem_tight)
2422                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2423                         if (soft_sync)
2424                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2425                         pga[i] = &oap->oap_brw_page;
2426                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2427                         i++;
2428
2429                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2430                         if (starting_offset == OBD_OBJECT_EOF ||
2431                             starting_offset > oap->oap_obj_off)
2432                                 starting_offset = oap->oap_obj_off;
2433                         else
2434                                 LASSERT(oap->oap_page_off == 0);
2435                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2436                                 ending_offset = oap->oap_obj_off +
2437                                                 oap->oap_count;
2438                         else
2439                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2440                                         PAGE_SIZE);
2441                 }
2442                 if (ext->oe_ndelay)
2443                         ndelay = true;
2444         }
2445
2446         /* first page in the list */
2447         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2448
2449         crattr = &osc_env_info(env)->oti_req_attr;
2450         memset(crattr, 0, sizeof(*crattr));
2451         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2452         crattr->cra_flags = ~0ULL;
2453         crattr->cra_page = oap2cl_page(oap);
2454         crattr->cra_oa = oa;
2455         cl_req_attr_set(env, osc2cl(obj), crattr);
2456
2457         if (cmd == OBD_BRW_WRITE) {
2458                 oa->o_grant_used = grant;
2459                 if (layout_version > 0) {
2460                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2461                                PFID(&oa->o_oi.oi_fid), layout_version);
2462
2463                         oa->o_layout_version = layout_version;
2464                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2465                 }
2466         }
2467
2468         sort_brw_pages(pga, page_count);
2469         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2470         if (rc != 0) {
2471                 CERROR("prep_req failed: %d\n", rc);
2472                 GOTO(out, rc);
2473         }
2474
2475         req->rq_commit_cb = brw_commit;
2476         req->rq_interpret_reply = brw_interpret;
2477         req->rq_memalloc = mem_tight != 0;
2478         oap->oap_request = ptlrpc_request_addref(req);
2479         if (ndelay) {
2480                 req->rq_no_resend = req->rq_no_delay = 1;
2481                 /* probably set a shorter timeout value.
2482                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2483                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2484         }
2485
2486         /* Need to update the timestamps after the request is built in case
2487          * we race with setattr (locally or in queue at OST).  If OST gets
2488          * later setattr before earlier BRW (as determined by the request xid),
2489          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2490          * way to do this in a single call.  bug 10150 */
2491         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2492         crattr->cra_oa = &body->oa;
2493         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2494         cl_req_attr_set(env, osc2cl(obj), crattr);
2495         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2496
2497         aa = ptlrpc_req_async_args(aa, req);
2498         INIT_LIST_HEAD(&aa->aa_oaps);
2499         list_splice_init(&rpc_list, &aa->aa_oaps);
2500         INIT_LIST_HEAD(&aa->aa_exts);
2501         list_splice_init(ext_list, &aa->aa_exts);
2502
2503         spin_lock(&cli->cl_loi_list_lock);
2504         starting_offset >>= PAGE_SHIFT;
2505         if (cmd == OBD_BRW_READ) {
2506                 cli->cl_r_in_flight++;
2507                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2508                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2509                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2510                                       starting_offset + 1);
2511         } else {
2512                 cli->cl_w_in_flight++;
2513                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2514                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2515                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2516                                       starting_offset + 1);
2517         }
2518         spin_unlock(&cli->cl_loi_list_lock);
2519
2520         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2521                   page_count, aa, cli->cl_r_in_flight,
2522                   cli->cl_w_in_flight);
2523         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2524
2525         ptlrpcd_add_req(req);
2526         rc = 0;
2527         EXIT;
2528
2529 out:
2530         if (mem_tight)
2531                 memalloc_noreclaim_restore(mpflag);
2532
2533         if (rc != 0) {
2534                 LASSERT(req == NULL);
2535
2536                 if (oa)
2537                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2538                 if (pga) {
2539                         osc_release_bounce_pages(pga, page_count);
2540                         osc_release_ppga(pga, page_count);
2541                 }
2542                 /* this should happen rarely and is pretty bad, it makes the
2543                  * pending list not follow the dirty order */
2544                 while (!list_empty(ext_list)) {
2545                         ext = list_entry(ext_list->next, struct osc_extent,
2546                                          oe_link);
2547                         list_del_init(&ext->oe_link);
2548                         osc_extent_finish(env, ext, 0, rc);
2549                 }
2550         }
2551         RETURN(rc);
2552 }
2553
2554 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2555 {
2556         int set = 0;
2557
2558         LASSERT(lock != NULL);
2559
2560         lock_res_and_lock(lock);
2561
2562         if (lock->l_ast_data == NULL)
2563                 lock->l_ast_data = data;
2564         if (lock->l_ast_data == data)
2565                 set = 1;
2566
2567         unlock_res_and_lock(lock);
2568
2569         return set;
2570 }
2571
2572 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2573                      void *cookie, struct lustre_handle *lockh,
2574                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2575                      int errcode)
2576 {
2577         bool intent = *flags & LDLM_FL_HAS_INTENT;
2578         int rc;
2579         ENTRY;
2580
2581         /* The request was created before ldlm_cli_enqueue call. */
2582         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2583                 struct ldlm_reply *rep;
2584
2585                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2586                 LASSERT(rep != NULL);
2587
2588                 rep->lock_policy_res1 =
2589                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2590                 if (rep->lock_policy_res1)
2591                         errcode = rep->lock_policy_res1;
2592                 if (!speculative)
2593                         *flags |= LDLM_FL_LVB_READY;
2594         } else if (errcode == ELDLM_OK) {
2595                 *flags |= LDLM_FL_LVB_READY;
2596         }
2597
2598         /* Call the update callback. */
2599         rc = (*upcall)(cookie, lockh, errcode);
2600
2601         /* release the reference taken in ldlm_cli_enqueue() */
2602         if (errcode == ELDLM_LOCK_MATCHED)
2603                 errcode = ELDLM_OK;
2604         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2605                 ldlm_lock_decref(lockh, mode);
2606
2607         RETURN(rc);
2608 }
2609
2610 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2611                           void *args, int rc)
2612 {
2613         struct osc_enqueue_args *aa = args;
2614         struct ldlm_lock *lock;
2615         struct lustre_handle *lockh = &aa->oa_lockh;
2616         enum ldlm_mode mode = aa->oa_mode;
2617         struct ost_lvb *lvb = aa->oa_lvb;
2618         __u32 lvb_len = sizeof(*lvb);
2619         __u64 flags = 0;
2620
2621         ENTRY;
2622
2623         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2624          * be valid. */
2625         lock = ldlm_handle2lock(lockh);
2626         LASSERTF(lock != NULL,
2627                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2628                  lockh->cookie, req, aa);
2629
2630         /* Take an additional reference so that a blocking AST that
2631          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2632          * to arrive after an upcall has been executed by
2633          * osc_enqueue_fini(). */
2634         ldlm_lock_addref(lockh, mode);
2635
2636         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2637         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2638
2639         /* Let CP AST to grant the lock first. */
2640         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2641
2642         if (aa->oa_speculative) {
2643                 LASSERT(aa->oa_lvb == NULL);
2644                 LASSERT(aa->oa_flags == NULL);
2645                 aa->oa_flags = &flags;
2646         }
2647
2648         /* Complete obtaining the lock procedure. */
2649         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2650                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2651                                    lockh, rc);
2652         /* Complete osc stuff. */
2653         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2654                               aa->oa_flags, aa->oa_speculative, rc);
2655
2656         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2657
2658         ldlm_lock_decref(lockh, mode);
2659         LDLM_LOCK_PUT(lock);
2660         RETURN(rc);
2661 }
2662
2663 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2664  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2665  * other synchronous requests, however keeping some locks and trying to obtain
2666  * others may take a considerable amount of time in a case of ost failure; and
2667  * when other sync requests do not get released lock from a client, the client
2668  * is evicted from the cluster -- such scenarious make the life difficult, so
2669  * release locks just after they are obtained. */
2670 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2671                      __u64 *flags, union ldlm_policy_data *policy,
2672                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2673                      void *cookie, struct ldlm_enqueue_info *einfo,
2674                      struct ptlrpc_request_set *rqset, int async,
2675                      bool speculative)
2676 {
2677         struct obd_device *obd = exp->exp_obd;
2678         struct lustre_handle lockh = { 0 };
2679         struct ptlrpc_request *req = NULL;
2680         int intent = *flags & LDLM_FL_HAS_INTENT;
2681         __u64 match_flags = *flags;
2682         enum ldlm_mode mode;
2683         int rc;
2684         ENTRY;
2685
2686         /* Filesystem lock extents are extended to page boundaries so that
2687          * dealing with the page cache is a little smoother.  */
2688         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2689         policy->l_extent.end |= ~PAGE_MASK;
2690
2691         /* Next, search for already existing extent locks that will cover us */
2692         /* If we're trying to read, we also search for an existing PW lock.  The
2693          * VFS and page cache already protect us locally, so lots of readers/
2694          * writers can share a single PW lock.
2695          *
2696          * There are problems with conversion deadlocks, so instead of
2697          * converting a read lock to a write lock, we'll just enqueue a new
2698          * one.
2699          *
2700          * At some point we should cancel the read lock instead of making them
2701          * send us a blocking callback, but there are problems with canceling
2702          * locks out from other users right now, too. */
2703         mode = einfo->ei_mode;
2704         if (einfo->ei_mode == LCK_PR)
2705                 mode |= LCK_PW;
2706         /* Normal lock requests must wait for the LVB to be ready before
2707          * matching a lock; speculative lock requests do not need to,
2708          * because they will not actually use the lock. */
2709         if (!speculative)
2710                 match_flags |= LDLM_FL_LVB_READY;
2711         if (intent != 0)
2712                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2713         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2714                                einfo->ei_type, policy, mode, &lockh, 0);
2715         if (mode) {
2716                 struct ldlm_lock *matched;
2717
2718                 if (*flags & LDLM_FL_TEST_LOCK)
2719                         RETURN(ELDLM_OK);
2720
2721                 matched = ldlm_handle2lock(&lockh);
2722                 if (speculative) {
2723                         /* This DLM lock request is speculative, and does not
2724                          * have an associated IO request. Therefore if there
2725                          * is already a DLM lock, it wll just inform the
2726                          * caller to cancel the request for this stripe.*/
2727                         lock_res_and_lock(matched);
2728                         if (ldlm_extent_equal(&policy->l_extent,
2729                             &matched->l_policy_data.l_extent))
2730                                 rc = -EEXIST;
2731                         else
2732                                 rc = -ECANCELED;
2733                         unlock_res_and_lock(matched);
2734
2735                         ldlm_lock_decref(&lockh, mode);
2736                         LDLM_LOCK_PUT(matched);
2737                         RETURN(rc);
2738                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2739                         *flags |= LDLM_FL_LVB_READY;
2740
2741                         /* We already have a lock, and it's referenced. */
2742                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2743
2744                         ldlm_lock_decref(&lockh, mode);
2745                         LDLM_LOCK_PUT(matched);
2746                         RETURN(ELDLM_OK);
2747                 } else {
2748                         ldlm_lock_decref(&lockh, mode);
2749                         LDLM_LOCK_PUT(matched);
2750                 }
2751         }
2752
2753         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2754                 RETURN(-ENOLCK);
2755
2756         if (intent) {
2757                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2758                                            &RQF_LDLM_ENQUEUE_LVB);
2759                 if (req == NULL)
2760                         RETURN(-ENOMEM);
2761
2762                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2763                 if (rc) {
2764                         ptlrpc_request_free(req);
2765                         RETURN(rc);
2766                 }
2767
2768                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2769                                      sizeof *lvb);
2770                 ptlrpc_request_set_replen(req);
2771         }
2772
2773         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2774         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2775
2776         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2777                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2778         if (async) {
2779                 if (!rc) {
2780                         struct osc_enqueue_args *aa;
2781                         aa = ptlrpc_req_async_args(aa, req);
2782                         aa->oa_exp         = exp;
2783                         aa->oa_mode        = einfo->ei_mode;
2784                         aa->oa_type        = einfo->ei_type;
2785                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2786                         aa->oa_upcall      = upcall;
2787                         aa->oa_cookie      = cookie;
2788                         aa->oa_speculative = speculative;
2789                         if (!speculative) {
2790                                 aa->oa_flags  = flags;
2791                                 aa->oa_lvb    = lvb;
2792                         } else {
2793                                 /* speculative locks are essentially to enqueue
2794                                  * a DLM lock  in advance, so we don't care
2795                                  * about the result of the enqueue. */
2796                                 aa->oa_lvb    = NULL;
2797                                 aa->oa_flags  = NULL;
2798                         }
2799
2800                         req->rq_interpret_reply = osc_enqueue_interpret;
2801                         ptlrpc_set_add_req(rqset, req);
2802                 } else if (intent) {
2803                         ptlrpc_req_finished(req);
2804                 }
2805                 RETURN(rc);
2806         }
2807
2808         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2809                               flags, speculative, rc);
2810         if (intent)
2811                 ptlrpc_req_finished(req);
2812
2813         RETURN(rc);
2814 }
2815
2816 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2817                    struct ldlm_res_id *res_id, enum ldlm_type type,
2818                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2819                    __u64 *flags, struct osc_object *obj,
2820                    struct lustre_handle *lockh, int unref)
2821 {
2822         struct obd_device *obd = exp->exp_obd;
2823         __u64 lflags = *flags;
2824         enum ldlm_mode rc;
2825         ENTRY;
2826
2827         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2828                 RETURN(-EIO);
2829
2830         /* Filesystem lock extents are extended to page boundaries so that
2831          * dealing with the page cache is a little smoother */
2832         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2833         policy->l_extent.end |= ~PAGE_MASK;
2834
2835         /* Next, search for already existing extent locks that will cover us */
2836         /* If we're trying to read, we also search for an existing PW lock.  The
2837          * VFS and page cache already protect us locally, so lots of readers/
2838          * writers can share a single PW lock. */
2839         rc = mode;
2840         if (mode == LCK_PR)
2841                 rc |= LCK_PW;
2842         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2843                              res_id, type, policy, rc, lockh, unref);
2844         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2845                 RETURN(rc);
2846
2847         if (obj != NULL) {
2848                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2849
2850                 LASSERT(lock != NULL);
2851                 if (osc_set_lock_data(lock, obj)) {
2852                         lock_res_and_lock(lock);
2853                         if (!ldlm_is_lvb_cached(lock)) {
2854                                 LASSERT(lock->l_ast_data == obj);
2855                                 osc_lock_lvb_update(env, obj, lock, NULL);
2856                                 ldlm_set_lvb_cached(lock);
2857                         }
2858                         unlock_res_and_lock(lock);
2859                 } else {
2860                         ldlm_lock_decref(lockh, rc);
2861                         rc = 0;
2862                 }
2863                 LDLM_LOCK_PUT(lock);
2864         }
2865         RETURN(rc);
2866 }
2867
2868 static int osc_statfs_interpret(const struct lu_env *env,
2869                                 struct ptlrpc_request *req, void *args, int rc)
2870 {
2871         struct osc_async_args *aa = args;
2872         struct obd_statfs *msfs;
2873
2874         ENTRY;
2875         if (rc == -EBADR)
2876                 /*
2877                  * The request has in fact never been sent due to issues at
2878                  * a higher level (LOV).  Exit immediately since the caller
2879                  * is aware of the problem and takes care of the clean up.
2880                  */
2881                 RETURN(rc);
2882
2883         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2884             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2885                 GOTO(out, rc = 0);
2886
2887         if (rc != 0)
2888                 GOTO(out, rc);
2889
2890         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2891         if (msfs == NULL)
2892                 GOTO(out, rc = -EPROTO);
2893
2894         *aa->aa_oi->oi_osfs = *msfs;
2895 out:
2896         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2897
2898         RETURN(rc);
2899 }
2900
2901 static int osc_statfs_async(struct obd_export *exp,
2902                             struct obd_info *oinfo, time64_t max_age,
2903                             struct ptlrpc_request_set *rqset)
2904 {
2905         struct obd_device     *obd = class_exp2obd(exp);
2906         struct ptlrpc_request *req;
2907         struct osc_async_args *aa;
2908         int rc;
2909         ENTRY;
2910
2911         if (obd->obd_osfs_age >= max_age) {
2912                 CDEBUG(D_SUPER,
2913                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2914                        obd->obd_name, &obd->obd_osfs,
2915                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2916                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2917                 spin_lock(&obd->obd_osfs_lock);
2918                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2919                 spin_unlock(&obd->obd_osfs_lock);
2920                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2921                 if (oinfo->oi_cb_up)
2922                         oinfo->oi_cb_up(oinfo, 0);
2923
2924                 RETURN(0);
2925         }
2926
2927         /* We could possibly pass max_age in the request (as an absolute
2928          * timestamp or a "seconds.usec ago") so the target can avoid doing
2929          * extra calls into the filesystem if that isn't necessary (e.g.
2930          * during mount that would help a bit).  Having relative timestamps
2931          * is not so great if request processing is slow, while absolute
2932          * timestamps are not ideal because they need time synchronization. */
2933         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2934         if (req == NULL)
2935                 RETURN(-ENOMEM);
2936
2937         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2938         if (rc) {
2939                 ptlrpc_request_free(req);
2940                 RETURN(rc);
2941         }
2942         ptlrpc_request_set_replen(req);
2943         req->rq_request_portal = OST_CREATE_PORTAL;
2944         ptlrpc_at_set_req_timeout(req);
2945
2946         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2947                 /* procfs requests not want stat in wait for avoid deadlock */
2948                 req->rq_no_resend = 1;
2949                 req->rq_no_delay = 1;
2950         }
2951
2952         req->rq_interpret_reply = osc_statfs_interpret;
2953         aa = ptlrpc_req_async_args(aa, req);
2954         aa->aa_oi = oinfo;
2955
2956         ptlrpc_set_add_req(rqset, req);
2957         RETURN(0);
2958 }
2959
2960 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2961                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2962 {
2963         struct obd_device     *obd = class_exp2obd(exp);
2964         struct obd_statfs     *msfs;
2965         struct ptlrpc_request *req;
2966         struct obd_import     *imp = NULL;
2967         int rc;
2968         ENTRY;
2969
2970
2971         /*Since the request might also come from lprocfs, so we need
2972          *sync this with client_disconnect_export Bug15684*/
2973         down_read(&obd->u.cli.cl_sem);
2974         if (obd->u.cli.cl_import)
2975                 imp = class_import_get(obd->u.cli.cl_import);
2976         up_read(&obd->u.cli.cl_sem);
2977         if (!imp)
2978                 RETURN(-ENODEV);
2979
2980         /* We could possibly pass max_age in the request (as an absolute
2981          * timestamp or a "seconds.usec ago") so the target can avoid doing
2982          * extra calls into the filesystem if that isn't necessary (e.g.
2983          * during mount that would help a bit).  Having relative timestamps
2984          * is not so great if request processing is slow, while absolute
2985          * timestamps are not ideal because they need time synchronization. */
2986         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2987
2988         class_import_put(imp);
2989
2990         if (req == NULL)
2991                 RETURN(-ENOMEM);
2992
2993         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2994         if (rc) {
2995                 ptlrpc_request_free(req);
2996                 RETURN(rc);
2997         }
2998         ptlrpc_request_set_replen(req);
2999         req->rq_request_portal = OST_CREATE_PORTAL;
3000         ptlrpc_at_set_req_timeout(req);
3001
3002         if (flags & OBD_STATFS_NODELAY) {
3003                 /* procfs requests not want stat in wait for avoid deadlock */
3004                 req->rq_no_resend = 1;
3005                 req->rq_no_delay = 1;
3006         }
3007
3008         rc = ptlrpc_queue_wait(req);
3009         if (rc)
3010                 GOTO(out, rc);
3011
3012         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3013         if (msfs == NULL)
3014                 GOTO(out, rc = -EPROTO);
3015
3016         *osfs = *msfs;
3017
3018         EXIT;
3019 out:
3020         ptlrpc_req_finished(req);
3021         return rc;
3022 }
3023
3024 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3025                          void *karg, void __user *uarg)
3026 {
3027         struct obd_device *obd = exp->exp_obd;
3028         struct obd_ioctl_data *data = karg;
3029         int rc = 0;
3030
3031         ENTRY;
3032         if (!try_module_get(THIS_MODULE)) {
3033                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3034                        module_name(THIS_MODULE));
3035                 return -EINVAL;
3036         }
3037         switch (cmd) {
3038         case OBD_IOC_CLIENT_RECOVER:
3039                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3040                                            data->ioc_inlbuf1, 0);
3041                 if (rc > 0)
3042                         rc = 0;
3043                 break;
3044         case IOC_OSC_SET_ACTIVE:
3045                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3046                                               data->ioc_offset);
3047                 break;
3048         default:
3049                 rc = -ENOTTY;
3050                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3051                        obd->obd_name, cmd, current->comm, rc);
3052                 break;
3053         }
3054
3055         module_put(THIS_MODULE);
3056         return rc;
3057 }
3058
3059 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3060                        u32 keylen, void *key, u32 vallen, void *val,
3061                        struct ptlrpc_request_set *set)
3062 {
3063         struct ptlrpc_request *req;
3064         struct obd_device     *obd = exp->exp_obd;
3065         struct obd_import     *imp = class_exp2cliimp(exp);
3066         char                  *tmp;
3067         int                    rc;
3068         ENTRY;
3069
3070         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3071
3072         if (KEY_IS(KEY_CHECKSUM)) {
3073                 if (vallen != sizeof(int))
3074                         RETURN(-EINVAL);
3075                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3076                 RETURN(0);
3077         }
3078
3079         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3080                 sptlrpc_conf_client_adapt(obd);
3081                 RETURN(0);
3082         }
3083
3084         if (KEY_IS(KEY_FLUSH_CTX)) {
3085                 sptlrpc_import_flush_my_ctx(imp);
3086                 RETURN(0);
3087         }
3088
3089         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3090                 struct client_obd *cli = &obd->u.cli;
3091                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3092                 long target = *(long *)val;
3093
3094                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3095                 *(long *)val -= nr;
3096                 RETURN(0);
3097         }
3098
3099         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3100                 RETURN(-EINVAL);
3101
3102         /* We pass all other commands directly to OST. Since nobody calls osc
3103            methods directly and everybody is supposed to go through LOV, we
3104            assume lov checked invalid values for us.
3105            The only recognised values so far are evict_by_nid and mds_conn.
3106            Even if something bad goes through, we'd get a -EINVAL from OST
3107            anyway. */
3108
3109         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3110                                                 &RQF_OST_SET_GRANT_INFO :
3111                                                 &RQF_OBD_SET_INFO);
3112         if (req == NULL)
3113                 RETURN(-ENOMEM);
3114
3115         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3116                              RCL_CLIENT, keylen);
3117         if (!KEY_IS(KEY_GRANT_SHRINK))
3118                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3119                                      RCL_CLIENT, vallen);
3120         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3121         if (rc) {
3122                 ptlrpc_request_free(req);
3123                 RETURN(rc);
3124         }
3125
3126         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3127         memcpy(tmp, key, keylen);
3128         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3129                                                         &RMF_OST_BODY :
3130                                                         &RMF_SETINFO_VAL);
3131         memcpy(tmp, val, vallen);
3132
3133         if (KEY_IS(KEY_GRANT_SHRINK)) {
3134                 struct osc_grant_args *aa;
3135                 struct obdo *oa;
3136
3137                 aa = ptlrpc_req_async_args(aa, req);
3138                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3139                 if (!oa) {
3140                         ptlrpc_req_finished(req);