Whamcloud - gitweb
LU-6142 lustre: remove module_vars arg to class_register_type()
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49 #include <linux/falloc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         /*
453          * Only mode == 0 (which is standard prealloc) is supported now.
454          * Punch is not supported yet.
455          */
456         if (mode & ~FALLOC_FL_KEEP_SIZE)
457                 RETURN(-EOPNOTSUPP);
458         oa->o_falloc_mode = mode;
459
460         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461                                    &RQF_OST_FALLOCATE);
462         if (req == NULL)
463                 RETURN(-ENOMEM);
464
465         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
466         if (rc != 0) {
467                 ptlrpc_request_free(req);
468                 RETURN(rc);
469         }
470
471         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
472         LASSERT(body);
473
474         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
475
476         ptlrpc_request_set_replen(req);
477
478         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
479         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
480         sa = ptlrpc_req_async_args(sa, req);
481         sa->sa_oa = oa;
482         sa->sa_upcall = upcall;
483         sa->sa_cookie = cookie;
484
485         ptlrpcd_add_req(req);
486
487         RETURN(0);
488 }
489
490 static int osc_sync_interpret(const struct lu_env *env,
491                               struct ptlrpc_request *req, void *args, int rc)
492 {
493         struct osc_fsync_args *fa = args;
494         struct ost_body *body;
495         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
496         unsigned long valid = 0;
497         struct cl_object *obj;
498         ENTRY;
499
500         if (rc != 0)
501                 GOTO(out, rc);
502
503         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
504         if (body == NULL) {
505                 CERROR("can't unpack ost_body\n");
506                 GOTO(out, rc = -EPROTO);
507         }
508
509         *fa->fa_oa = body->oa;
510         obj = osc2cl(fa->fa_obj);
511
512         /* Update osc object's blocks attribute */
513         cl_object_attr_lock(obj);
514         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
515                 attr->cat_blocks = body->oa.o_blocks;
516                 valid |= CAT_BLOCKS;
517         }
518
519         if (valid != 0)
520                 cl_object_attr_update(env, obj, attr, valid);
521         cl_object_attr_unlock(obj);
522
523 out:
524         rc = fa->fa_upcall(fa->fa_cookie, rc);
525         RETURN(rc);
526 }
527
528 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
529                   obd_enqueue_update_f upcall, void *cookie,
530                   struct ptlrpc_request_set *rqset)
531 {
532         struct obd_export     *exp = osc_export(obj);
533         struct ptlrpc_request *req;
534         struct ost_body       *body;
535         struct osc_fsync_args *fa;
536         int                    rc;
537         ENTRY;
538
539         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
540         if (req == NULL)
541                 RETURN(-ENOMEM);
542
543         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
544         if (rc) {
545                 ptlrpc_request_free(req);
546                 RETURN(rc);
547         }
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
551         LASSERT(body);
552         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
553
554         ptlrpc_request_set_replen(req);
555         req->rq_interpret_reply = osc_sync_interpret;
556
557         fa = ptlrpc_req_async_args(fa, req);
558         fa->fa_obj = obj;
559         fa->fa_oa = oa;
560         fa->fa_upcall = upcall;
561         fa->fa_cookie = cookie;
562
563         ptlrpc_set_add_req(rqset, req);
564
565         RETURN (0);
566 }
567
568 /* Find and cancel locally locks matched by @mode in the resource found by
569  * @objid. Found locks are added into @cancel list. Returns the amount of
570  * locks added to @cancels list. */
571 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
572                                    struct list_head *cancels,
573                                    enum ldlm_mode mode, __u64 lock_flags)
574 {
575         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
576         struct ldlm_res_id res_id;
577         struct ldlm_resource *res;
578         int count;
579         ENTRY;
580
581         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
582          * export) but disabled through procfs (flag in NS).
583          *
584          * This distinguishes from a case when ELC is not supported originally,
585          * when we still want to cancel locks in advance and just cancel them
586          * locally, without sending any RPC. */
587         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
588                 RETURN(0);
589
590         ostid_build_res_name(&oa->o_oi, &res_id);
591         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
592         if (IS_ERR(res))
593                 RETURN(0);
594
595         LDLM_RESOURCE_ADDREF(res);
596         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
597                                            lock_flags, 0, NULL);
598         LDLM_RESOURCE_DELREF(res);
599         ldlm_resource_putref(res);
600         RETURN(count);
601 }
602
603 static int osc_destroy_interpret(const struct lu_env *env,
604                                  struct ptlrpc_request *req, void *args, int rc)
605 {
606         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
607
608         atomic_dec(&cli->cl_destroy_in_flight);
609         wake_up(&cli->cl_destroy_waitq);
610
611         return 0;
612 }
613
614 static int osc_can_send_destroy(struct client_obd *cli)
615 {
616         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617             cli->cl_max_rpcs_in_flight) {
618                 /* The destroy request can be sent */
619                 return 1;
620         }
621         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622             cli->cl_max_rpcs_in_flight) {
623                 /*
624                  * The counter has been modified between the two atomic
625                  * operations.
626                  */
627                 wake_up(&cli->cl_destroy_waitq);
628         }
629         return 0;
630 }
631
632 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
633                        struct obdo *oa)
634 {
635         struct client_obd     *cli = &exp->exp_obd->u.cli;
636         struct ptlrpc_request *req;
637         struct ost_body       *body;
638         LIST_HEAD(cancels);
639         int rc, count;
640         ENTRY;
641
642         if (!oa) {
643                 CDEBUG(D_INFO, "oa NULL\n");
644                 RETURN(-EINVAL);
645         }
646
647         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
648                                         LDLM_FL_DISCARD_DATA);
649
650         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
651         if (req == NULL) {
652                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
653                 RETURN(-ENOMEM);
654         }
655
656         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
657                                0, &cancels, count);
658         if (rc) {
659                 ptlrpc_request_free(req);
660                 RETURN(rc);
661         }
662
663         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
664         ptlrpc_at_set_req_timeout(req);
665
666         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
667         LASSERT(body);
668         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
669
670         ptlrpc_request_set_replen(req);
671
672         req->rq_interpret_reply = osc_destroy_interpret;
673         if (!osc_can_send_destroy(cli)) {
674                 /*
675                  * Wait until the number of on-going destroy RPCs drops
676                  * under max_rpc_in_flight
677                  */
678                 rc = l_wait_event_abortable_exclusive(
679                         cli->cl_destroy_waitq,
680                         osc_can_send_destroy(cli));
681                 if (rc) {
682                         ptlrpc_req_finished(req);
683                         RETURN(-EINTR);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         if (cli->cl_ocd_grant_param)
702                 oa->o_dirty = cli->cl_dirty_grant;
703         else
704                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
705         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
706                 CERROR("dirty %lu > dirty_max %lu\n",
707                        cli->cl_dirty_pages,
708                        cli->cl_dirty_max_pages);
709                 oa->o_undirty = 0;
710         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
711                             (long)(obd_max_dirty_pages + 1))) {
712                 /* The atomic_read() allowing the atomic_inc() are
713                  * not covered by a lock thus they may safely race and trip
714                  * this CERROR() unless we add in a small fudge factor (+1). */
715                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
716                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
717                        obd_max_dirty_pages);
718                 oa->o_undirty = 0;
719         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
720                             0x7fffffff)) {
721                 CERROR("dirty %lu - dirty_max %lu too big???\n",
722                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
723                 oa->o_undirty = 0;
724         } else {
725                 unsigned long nrpages;
726                 unsigned long undirty;
727
728                 nrpages = cli->cl_max_pages_per_rpc;
729                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
730                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
731                 undirty = nrpages << PAGE_SHIFT;
732                 if (cli->cl_ocd_grant_param) {
733                         int nrextents;
734
735                         /* take extent tax into account when asking for more
736                          * grant space */
737                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
738                                      cli->cl_max_extent_pages;
739                         undirty += nrextents * cli->cl_grant_extent_tax;
740                 }
741                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
742                  * to add extent tax, etc.
743                  */
744                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
745                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
746         }
747         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
748         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
749         if (cli->cl_lost_grant > INT_MAX) {
750                 CDEBUG(D_CACHE,
751                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
752                       cli_name(cli), cli->cl_lost_grant);
753                 oa->o_dropped = INT_MAX;
754         } else {
755                 oa->o_dropped = cli->cl_lost_grant;
756         }
757         cli->cl_lost_grant -= oa->o_dropped;
758         spin_unlock(&cli->cl_loi_list_lock);
759         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
760                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
761                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
762 }
763
764 void osc_update_next_shrink(struct client_obd *cli)
765 {
766         cli->cl_next_shrink_grant = ktime_get_seconds() +
767                                     cli->cl_grant_shrink_interval;
768
769         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
770                cli->cl_next_shrink_grant);
771 }
772
773 static void __osc_update_grant(struct client_obd *cli, u64 grant)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         cli->cl_avail_grant += grant;
777         spin_unlock(&cli->cl_loi_list_lock);
778 }
779
780 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
781 {
782         if (body->oa.o_valid & OBD_MD_FLGRANT) {
783                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
784                 __osc_update_grant(cli, body->oa.o_grant);
785         }
786 }
787
788 /**
789  * grant thread data for shrinking space.
790  */
791 struct grant_thread_data {
792         struct list_head        gtd_clients;
793         struct mutex            gtd_mutex;
794         unsigned long           gtd_stopped:1;
795 };
796 static struct grant_thread_data client_gtd;
797
798 static int osc_shrink_grant_interpret(const struct lu_env *env,
799                                       struct ptlrpc_request *req,
800                                       void *args, int rc)
801 {
802         struct osc_grant_args *aa = args;
803         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
804         struct ost_body *body;
805
806         if (rc != 0) {
807                 __osc_update_grant(cli, aa->aa_oa->o_grant);
808                 GOTO(out, rc);
809         }
810
811         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
812         LASSERT(body);
813         osc_update_grant(cli, body);
814 out:
815         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
816         aa->aa_oa = NULL;
817
818         return rc;
819 }
820
821 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
822 {
823         spin_lock(&cli->cl_loi_list_lock);
824         oa->o_grant = cli->cl_avail_grant / 4;
825         cli->cl_avail_grant -= oa->o_grant;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
828                 oa->o_valid |= OBD_MD_FLFLAGS;
829                 oa->o_flags = 0;
830         }
831         oa->o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833 }
834
835 /* Shrink the current grant, either from some large amount to enough for a
836  * full set of in-flight RPCs, or if we have already shrunk to that limit
837  * then to enough for a single RPC.  This avoids keeping more grant than
838  * needed, and avoids shrinking the grant piecemeal. */
839 static int osc_shrink_grant(struct client_obd *cli)
840 {
841         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
842                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
843
844         spin_lock(&cli->cl_loi_list_lock);
845         if (cli->cl_avail_grant <= target_bytes)
846                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
847         spin_unlock(&cli->cl_loi_list_lock);
848
849         return osc_shrink_grant_to_target(cli, target_bytes);
850 }
851
852 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
853 {
854         int                     rc = 0;
855         struct ost_body        *body;
856         ENTRY;
857
858         spin_lock(&cli->cl_loi_list_lock);
859         /* Don't shrink if we are already above or below the desired limit
860          * We don't want to shrink below a single RPC, as that will negatively
861          * impact block allocation and long-term performance. */
862         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
863                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
864
865         if (target_bytes >= cli->cl_avail_grant) {
866                 spin_unlock(&cli->cl_loi_list_lock);
867                 RETURN(0);
868         }
869         spin_unlock(&cli->cl_loi_list_lock);
870
871         OBD_ALLOC_PTR(body);
872         if (!body)
873                 RETURN(-ENOMEM);
874
875         osc_announce_cached(cli, &body->oa, 0);
876
877         spin_lock(&cli->cl_loi_list_lock);
878         if (target_bytes >= cli->cl_avail_grant) {
879                 /* available grant has changed since target calculation */
880                 spin_unlock(&cli->cl_loi_list_lock);
881                 GOTO(out_free, rc = 0);
882         }
883         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
884         cli->cl_avail_grant = target_bytes;
885         spin_unlock(&cli->cl_loi_list_lock);
886         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
887                 body->oa.o_valid |= OBD_MD_FLFLAGS;
888                 body->oa.o_flags = 0;
889         }
890         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
891         osc_update_next_shrink(cli);
892
893         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
894                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
895                                 sizeof(*body), body, NULL);
896         if (rc != 0)
897                 __osc_update_grant(cli, body->oa.o_grant);
898 out_free:
899         OBD_FREE_PTR(body);
900         RETURN(rc);
901 }
902
903 static int osc_should_shrink_grant(struct client_obd *client)
904 {
905         time64_t next_shrink = client->cl_next_shrink_grant;
906
907         if (client->cl_import == NULL)
908                 return 0;
909
910         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
911             client->cl_import->imp_grant_shrink_disabled) {
912                 osc_update_next_shrink(client);
913                 return 0;
914         }
915
916         if (ktime_get_seconds() >= next_shrink - 5) {
917                 /* Get the current RPC size directly, instead of going via:
918                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
919                  * Keep comment here so that it can be found by searching. */
920                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
921
922                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
923                     client->cl_avail_grant > brw_size)
924                         return 1;
925                 else
926                         osc_update_next_shrink(client);
927         }
928         return 0;
929 }
930
931 #define GRANT_SHRINK_RPC_BATCH  100
932
933 static struct delayed_work work;
934
935 static void osc_grant_work_handler(struct work_struct *data)
936 {
937         struct client_obd *cli;
938         int rpc_sent;
939         bool init_next_shrink = true;
940         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
941
942         rpc_sent = 0;
943         mutex_lock(&client_gtd.gtd_mutex);
944         list_for_each_entry(cli, &client_gtd.gtd_clients,
945                             cl_grant_chain) {
946                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
947                     osc_should_shrink_grant(cli)) {
948                         osc_shrink_grant(cli);
949                         rpc_sent++;
950                 }
951
952                 if (!init_next_shrink) {
953                         if (cli->cl_next_shrink_grant < next_shrink &&
954                             cli->cl_next_shrink_grant > ktime_get_seconds())
955                                 next_shrink = cli->cl_next_shrink_grant;
956                 } else {
957                         init_next_shrink = false;
958                         next_shrink = cli->cl_next_shrink_grant;
959                 }
960         }
961         mutex_unlock(&client_gtd.gtd_mutex);
962
963         if (client_gtd.gtd_stopped == 1)
964                 return;
965
966         if (next_shrink > ktime_get_seconds()) {
967                 time64_t delay = next_shrink - ktime_get_seconds();
968
969                 schedule_delayed_work(&work, cfs_time_seconds(delay));
970         } else {
971                 schedule_work(&work.work);
972         }
973 }
974
975 void osc_schedule_grant_work(void)
976 {
977         cancel_delayed_work_sync(&work);
978         schedule_work(&work.work);
979 }
980
981 /**
982  * Start grant thread for returing grant to server for idle clients.
983  */
984 static int osc_start_grant_work(void)
985 {
986         client_gtd.gtd_stopped = 0;
987         mutex_init(&client_gtd.gtd_mutex);
988         INIT_LIST_HEAD(&client_gtd.gtd_clients);
989
990         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
991         schedule_work(&work.work);
992
993         return 0;
994 }
995
996 static void osc_stop_grant_work(void)
997 {
998         client_gtd.gtd_stopped = 1;
999         cancel_delayed_work_sync(&work);
1000 }
1001
1002 static void osc_add_grant_list(struct client_obd *client)
1003 {
1004         mutex_lock(&client_gtd.gtd_mutex);
1005         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1006         mutex_unlock(&client_gtd.gtd_mutex);
1007 }
1008
1009 static void osc_del_grant_list(struct client_obd *client)
1010 {
1011         if (list_empty(&client->cl_grant_chain))
1012                 return;
1013
1014         mutex_lock(&client_gtd.gtd_mutex);
1015         list_del_init(&client->cl_grant_chain);
1016         mutex_unlock(&client_gtd.gtd_mutex);
1017 }
1018
1019 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1020 {
1021         /*
1022          * ocd_grant is the total grant amount we're expect to hold: if we've
1023          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1024          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1025          * dirty.
1026          *
1027          * race is tolerable here: if we're evicted, but imp_state already
1028          * left EVICTED state, then cl_dirty_pages must be 0 already.
1029          */
1030         spin_lock(&cli->cl_loi_list_lock);
1031         cli->cl_avail_grant = ocd->ocd_grant;
1032         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1033                 unsigned long consumed = cli->cl_reserved_grant;
1034
1035                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1036                         consumed += cli->cl_dirty_grant;
1037                 else
1038                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1039                 if (cli->cl_avail_grant < consumed) {
1040                         CERROR("%s: granted %ld but already consumed %ld\n",
1041                                cli_name(cli), cli->cl_avail_grant, consumed);
1042                         cli->cl_avail_grant = 0;
1043                 } else {
1044                         cli->cl_avail_grant -= consumed;
1045                 }
1046         }
1047
1048         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1049                 u64 size;
1050                 int chunk_mask;
1051
1052                 /* overhead for each extent insertion */
1053                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1054                 /* determine the appropriate chunk size used by osc_extent. */
1055                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1056                                           ocd->ocd_grant_blkbits);
1057                 /* max_pages_per_rpc must be chunk aligned */
1058                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1059                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1060                                              ~chunk_mask) & chunk_mask;
1061                 /* determine maximum extent size, in #pages */
1062                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1063                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1064                 cli->cl_ocd_grant_param = 1;
1065         } else {
1066                 cli->cl_ocd_grant_param = 0;
1067                 cli->cl_grant_extent_tax = 0;
1068                 cli->cl_chunkbits = PAGE_SHIFT;
1069                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1070         }
1071         spin_unlock(&cli->cl_loi_list_lock);
1072
1073         CDEBUG(D_CACHE,
1074                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1075                cli_name(cli),
1076                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1077                cli->cl_max_extent_pages);
1078
1079         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1080                 osc_add_grant_list(cli);
1081 }
1082 EXPORT_SYMBOL(osc_init_grant);
1083
1084 /* We assume that the reason this OSC got a short read is because it read
1085  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1086  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1087  * this stripe never got written at or beyond this stripe offset yet. */
1088 static void handle_short_read(int nob_read, size_t page_count,
1089                               struct brw_page **pga)
1090 {
1091         char *ptr;
1092         int i = 0;
1093
1094         /* skip bytes read OK */
1095         while (nob_read > 0) {
1096                 LASSERT (page_count > 0);
1097
1098                 if (pga[i]->count > nob_read) {
1099                         /* EOF inside this page */
1100                         ptr = kmap(pga[i]->pg) +
1101                                 (pga[i]->off & ~PAGE_MASK);
1102                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1103                         kunmap(pga[i]->pg);
1104                         page_count--;
1105                         i++;
1106                         break;
1107                 }
1108
1109                 nob_read -= pga[i]->count;
1110                 page_count--;
1111                 i++;
1112         }
1113
1114         /* zero remaining pages */
1115         while (page_count-- > 0) {
1116                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1117                 memset(ptr, 0, pga[i]->count);
1118                 kunmap(pga[i]->pg);
1119                 i++;
1120         }
1121 }
1122
1123 static int check_write_rcs(struct ptlrpc_request *req,
1124                            int requested_nob, int niocount,
1125                            size_t page_count, struct brw_page **pga)
1126 {
1127         int     i;
1128         __u32   *remote_rcs;
1129
1130         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1131                                                   sizeof(*remote_rcs) *
1132                                                   niocount);
1133         if (remote_rcs == NULL) {
1134                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1135                 return(-EPROTO);
1136         }
1137
1138         /* return error if any niobuf was in error */
1139         for (i = 0; i < niocount; i++) {
1140                 if ((int)remote_rcs[i] < 0) {
1141                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1142                                i, remote_rcs[i], req);
1143                         return remote_rcs[i];
1144                 }
1145
1146                 if (remote_rcs[i] != 0) {
1147                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1148                                 i, remote_rcs[i], req);
1149                         return(-EPROTO);
1150                 }
1151         }
1152         if (req->rq_bulk != NULL &&
1153             req->rq_bulk->bd_nob_transferred != requested_nob) {
1154                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1155                        req->rq_bulk->bd_nob_transferred, requested_nob);
1156                 return(-EPROTO);
1157         }
1158
1159         return (0);
1160 }
1161
1162 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1163 {
1164         if (p1->flag != p2->flag) {
1165                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1166                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1167                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1168
1169                 /* warn if we try to combine flags that we don't know to be
1170                  * safe to combine */
1171                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1172                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1173                               "report this at https://jira.whamcloud.com/\n",
1174                               p1->flag, p2->flag);
1175                 }
1176                 return 0;
1177         }
1178
1179         return (p1->off + p1->count == p2->off);
1180 }
1181
1182 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1183 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1184                                    size_t pg_count, struct brw_page **pga,
1185                                    int opc, obd_dif_csum_fn *fn,
1186                                    int sector_size,
1187                                    u32 *check_sum)
1188 {
1189         struct ahash_request *req;
1190         /* Used Adler as the default checksum type on top of DIF tags */
1191         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1192         struct page *__page;
1193         unsigned char *buffer;
1194         __u16 *guard_start;
1195         unsigned int bufsize;
1196         int guard_number;
1197         int used_number = 0;
1198         int used;
1199         u32 cksum;
1200         int rc = 0;
1201         int i = 0;
1202
1203         LASSERT(pg_count > 0);
1204
1205         __page = alloc_page(GFP_KERNEL);
1206         if (__page == NULL)
1207                 return -ENOMEM;
1208
1209         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1210         if (IS_ERR(req)) {
1211                 rc = PTR_ERR(req);
1212                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1213                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1214                 GOTO(out, rc);
1215         }
1216
1217         buffer = kmap(__page);
1218         guard_start = (__u16 *)buffer;
1219         guard_number = PAGE_SIZE / sizeof(*guard_start);
1220         while (nob > 0 && pg_count > 0) {
1221                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1222
1223                 /* corrupt the data before we compute the checksum, to
1224                  * simulate an OST->client data error */
1225                 if (unlikely(i == 0 && opc == OST_READ &&
1226                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1227                         unsigned char *ptr = kmap(pga[i]->pg);
1228                         int off = pga[i]->off & ~PAGE_MASK;
1229
1230                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1231                         kunmap(pga[i]->pg);
1232                 }
1233
1234                 /*
1235                  * The left guard number should be able to hold checksums of a
1236                  * whole page
1237                  */
1238                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1239                                                   pga[i]->off & ~PAGE_MASK,
1240                                                   count,
1241                                                   guard_start + used_number,
1242                                                   guard_number - used_number,
1243                                                   &used, sector_size,
1244                                                   fn);
1245                 if (rc)
1246                         break;
1247
1248                 used_number += used;
1249                 if (used_number == guard_number) {
1250                         cfs_crypto_hash_update_page(req, __page, 0,
1251                                 used_number * sizeof(*guard_start));
1252                         used_number = 0;
1253                 }
1254
1255                 nob -= pga[i]->count;
1256                 pg_count--;
1257                 i++;
1258         }
1259         kunmap(__page);
1260         if (rc)
1261                 GOTO(out, rc);
1262
1263         if (used_number != 0)
1264                 cfs_crypto_hash_update_page(req, __page, 0,
1265                         used_number * sizeof(*guard_start));
1266
1267         bufsize = sizeof(cksum);
1268         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1269
1270         /* For sending we only compute the wrong checksum instead
1271          * of corrupting the data so it is still correct on a redo */
1272         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1273                 cksum++;
1274
1275         *check_sum = cksum;
1276 out:
1277         __free_page(__page);
1278         return rc;
1279 }
1280 #else /* !CONFIG_CRC_T10DIF */
1281 #define obd_dif_ip_fn NULL
1282 #define obd_dif_crc_fn NULL
1283 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1284         -EOPNOTSUPP
1285 #endif /* CONFIG_CRC_T10DIF */
1286
1287 static int osc_checksum_bulk(int nob, size_t pg_count,
1288                              struct brw_page **pga, int opc,
1289                              enum cksum_types cksum_type,
1290                              u32 *cksum)
1291 {
1292         int                             i = 0;
1293         struct ahash_request           *req;
1294         unsigned int                    bufsize;
1295         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1296
1297         LASSERT(pg_count > 0);
1298
1299         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1300         if (IS_ERR(req)) {
1301                 CERROR("Unable to initialize checksum hash %s\n",
1302                        cfs_crypto_hash_name(cfs_alg));
1303                 return PTR_ERR(req);
1304         }
1305
1306         while (nob > 0 && pg_count > 0) {
1307                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1308
1309                 /* corrupt the data before we compute the checksum, to
1310                  * simulate an OST->client data error */
1311                 if (i == 0 && opc == OST_READ &&
1312                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1313                         unsigned char *ptr = kmap(pga[i]->pg);
1314                         int off = pga[i]->off & ~PAGE_MASK;
1315
1316                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1317                         kunmap(pga[i]->pg);
1318                 }
1319                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1320                                             pga[i]->off & ~PAGE_MASK,
1321                                             count);
1322                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1323                                (int)(pga[i]->off & ~PAGE_MASK));
1324
1325                 nob -= pga[i]->count;
1326                 pg_count--;
1327                 i++;
1328         }
1329
1330         bufsize = sizeof(*cksum);
1331         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1332
1333         /* For sending we only compute the wrong checksum instead
1334          * of corrupting the data so it is still correct on a redo */
1335         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1336                 (*cksum)++;
1337
1338         return 0;
1339 }
1340
1341 static int osc_checksum_bulk_rw(const char *obd_name,
1342                                 enum cksum_types cksum_type,
1343                                 int nob, size_t pg_count,
1344                                 struct brw_page **pga, int opc,
1345                                 u32 *check_sum)
1346 {
1347         obd_dif_csum_fn *fn = NULL;
1348         int sector_size = 0;
1349         int rc;
1350
1351         ENTRY;
1352         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1353
1354         if (fn)
1355                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1356                                              opc, fn, sector_size, check_sum);
1357         else
1358                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1359                                        check_sum);
1360
1361         RETURN(rc);
1362 }
1363
1364 static inline void osc_release_bounce_pages(struct brw_page **pga,
1365                                             u32 page_count)
1366 {
1367 #ifdef HAVE_LUSTRE_CRYPTO
1368         int i;
1369
1370         for (i = 0; i < page_count; i++) {
1371                 /* Bounce pages allocated by a call to
1372                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1373                  * are identified thanks to the PageChecked flag.
1374                  */
1375                 if (PageChecked(pga[i]->pg))
1376                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1377                 pga[i]->count -= pga[i]->bp_count_diff;
1378                 pga[i]->off += pga[i]->bp_off_diff;
1379         }
1380 #endif
1381 }
1382
1383 static int
1384 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1385                      u32 page_count, struct brw_page **pga,
1386                      struct ptlrpc_request **reqp, int resend)
1387 {
1388         struct ptlrpc_request *req;
1389         struct ptlrpc_bulk_desc *desc;
1390         struct ost_body *body;
1391         struct obd_ioobj *ioobj;
1392         struct niobuf_remote *niobuf;
1393         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1394         struct osc_brw_async_args *aa;
1395         struct req_capsule *pill;
1396         struct brw_page *pg_prev;
1397         void *short_io_buf;
1398         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1399         struct inode *inode;
1400         bool directio = false;
1401
1402         ENTRY;
1403         inode = page2inode(pga[0]->pg);
1404         if (inode == NULL) {
1405                 /* Try to get reference to inode from cl_page if we are
1406                  * dealing with direct IO, as handled pages are not
1407                  * actual page cache pages.
1408                  */
1409                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1410                 struct cl_page *clpage = oap2cl_page(oap);
1411
1412                 inode = clpage->cp_inode;
1413                 if (inode)
1414                         directio = true;
1415         }
1416         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1417                 RETURN(-ENOMEM); /* Recoverable */
1418         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1419                 RETURN(-EINVAL); /* Fatal */
1420
1421         if ((cmd & OBD_BRW_WRITE) != 0) {
1422                 opc = OST_WRITE;
1423                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1424                                                 osc_rq_pool,
1425                                                 &RQF_OST_BRW_WRITE);
1426         } else {
1427                 opc = OST_READ;
1428                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1429         }
1430         if (req == NULL)
1431                 RETURN(-ENOMEM);
1432
1433         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1434                 for (i = 0; i < page_count; i++) {
1435                         struct brw_page *pg = pga[i];
1436                         struct page *data_page = NULL;
1437                         bool retried = false;
1438                         bool lockedbymyself;
1439                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1440                         struct address_space *map_orig = NULL;
1441                         pgoff_t index_orig;
1442
1443 retry_encrypt:
1444                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1445                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1446                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1447                         /* The page can already be locked when we arrive here.
1448                          * This is possible when cl_page_assume/vvp_page_assume
1449                          * is stuck on wait_on_page_writeback with page lock
1450                          * held. In this case there is no risk for the lock to
1451                          * be released while we are doing our encryption
1452                          * processing, because writeback against that page will
1453                          * end in vvp_page_completion_write/cl_page_completion,
1454                          * which means only once the page is fully processed.
1455                          */
1456                         lockedbymyself = trylock_page(pg->pg);
1457                         if (directio) {
1458                                 map_orig = pg->pg->mapping;
1459                                 pg->pg->mapping = inode->i_mapping;
1460                                 index_orig = pg->pg->index;
1461                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1462                         }
1463                         data_page =
1464                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1465                                                                  nunits, 0,
1466                                                                  GFP_NOFS);
1467                         if (directio) {
1468                                 pg->pg->mapping = map_orig;
1469                                 pg->pg->index = index_orig;
1470                         }
1471                         if (lockedbymyself)
1472                                 unlock_page(pg->pg);
1473                         if (IS_ERR(data_page)) {
1474                                 rc = PTR_ERR(data_page);
1475                                 if (rc == -ENOMEM && !retried) {
1476                                         retried = true;
1477                                         rc = 0;
1478                                         goto retry_encrypt;
1479                                 }
1480                                 ptlrpc_request_free(req);
1481                                 RETURN(rc);
1482                         }
1483                         /* Set PageChecked flag on bounce page for
1484                          * disambiguation in osc_release_bounce_pages().
1485                          */
1486                         SetPageChecked(data_page);
1487                         pg->pg = data_page;
1488                         /* there should be no gap in the middle of page array */
1489                         if (i == page_count - 1) {
1490                                 struct osc_async_page *oap = brw_page2oap(pg);
1491
1492                                 oa->o_size = oap->oap_count +
1493                                         oap->oap_obj_off + oap->oap_page_off;
1494                         }
1495                         /* len is forced to nunits, and relative offset to 0
1496                          * so store the old, clear text info
1497                          */
1498                         pg->bp_count_diff = nunits - pg->count;
1499                         pg->count = nunits;
1500                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1501                         pg->off = pg->off & PAGE_MASK;
1502                 }
1503         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1504                 for (i = 0; i < page_count; i++) {
1505                         struct brw_page *pg = pga[i];
1506                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1507
1508                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1509                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1510                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1511                         /* count/off are forced to cover the whole encryption
1512                          * unit size so that all encrypted data is stored on the
1513                          * OST, so adjust bp_{count,off}_diff for the size of
1514                          * the clear text.
1515                          */
1516                         pg->bp_count_diff = nunits - pg->count;
1517                         pg->count = nunits;
1518                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1519                         pg->off = pg->off & PAGE_MASK;
1520                 }
1521         }
1522
1523         for (niocount = i = 1; i < page_count; i++) {
1524                 if (!can_merge_pages(pga[i - 1], pga[i]))
1525                         niocount++;
1526         }
1527
1528         pill = &req->rq_pill;
1529         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1530                              sizeof(*ioobj));
1531         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1532                              niocount * sizeof(*niobuf));
1533
1534         for (i = 0; i < page_count; i++) {
1535                 short_io_size += pga[i]->count;
1536                 if (!inode || !IS_ENCRYPTED(inode)) {
1537                         pga[i]->bp_count_diff = 0;
1538                         pga[i]->bp_off_diff = 0;
1539                 }
1540         }
1541
1542         /* Check if read/write is small enough to be a short io. */
1543         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1544             !imp_connect_shortio(cli->cl_import))
1545                 short_io_size = 0;
1546
1547         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1548                              opc == OST_READ ? 0 : short_io_size);
1549         if (opc == OST_READ)
1550                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1551                                      short_io_size);
1552
1553         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1554         if (rc) {
1555                 ptlrpc_request_free(req);
1556                 RETURN(rc);
1557         }
1558         osc_set_io_portal(req);
1559
1560         ptlrpc_at_set_req_timeout(req);
1561         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1562          * retry logic */
1563         req->rq_no_retry_einprogress = 1;
1564
1565         if (short_io_size != 0) {
1566                 desc = NULL;
1567                 short_io_buf = NULL;
1568                 goto no_bulk;
1569         }
1570
1571         desc = ptlrpc_prep_bulk_imp(req, page_count,
1572                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1573                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1574                         PTLRPC_BULK_PUT_SINK),
1575                 OST_BULK_PORTAL,
1576                 &ptlrpc_bulk_kiov_pin_ops);
1577
1578         if (desc == NULL)
1579                 GOTO(out, rc = -ENOMEM);
1580         /* NB request now owns desc and will free it when it gets freed */
1581 no_bulk:
1582         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1583         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1584         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1585         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1586
1587         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1588
1589         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1590          * and from_kgid(), because they are asynchronous. Fortunately, variable
1591          * oa contains valid o_uid and o_gid in these two operations.
1592          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1593          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1594          * other process logic */
1595         body->oa.o_uid = oa->o_uid;
1596         body->oa.o_gid = oa->o_gid;
1597
1598         obdo_to_ioobj(oa, ioobj);
1599         ioobj->ioo_bufcnt = niocount;
1600         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1601          * that might be send for this request.  The actual number is decided
1602          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1603          * "max - 1" for old client compatibility sending "0", and also so the
1604          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1605         if (desc != NULL)
1606                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1607         else /* short io */
1608                 ioobj_max_brw_set(ioobj, 0);
1609
1610         if (short_io_size != 0) {
1611                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1612                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1613                         body->oa.o_flags = 0;
1614                 }
1615                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1616                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1617                        short_io_size);
1618                 if (opc == OST_WRITE) {
1619                         short_io_buf = req_capsule_client_get(pill,
1620                                                               &RMF_SHORT_IO);
1621                         LASSERT(short_io_buf != NULL);
1622                 }
1623         }
1624
1625         LASSERT(page_count > 0);
1626         pg_prev = pga[0];
1627         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1628                 struct brw_page *pg = pga[i];
1629                 int poff = pg->off & ~PAGE_MASK;
1630
1631                 LASSERT(pg->count > 0);
1632                 /* make sure there is no gap in the middle of page array */
1633                 LASSERTF(page_count == 1 ||
1634                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1635                           ergo(i > 0 && i < page_count - 1,
1636                                poff == 0 && pg->count == PAGE_SIZE)   &&
1637                           ergo(i == page_count - 1, poff == 0)),
1638                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1639                          i, page_count, pg, pg->off, pg->count);
1640                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1641                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1642                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1643                          i, page_count,
1644                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1645                          pg_prev->pg, page_private(pg_prev->pg),
1646                          pg_prev->pg->index, pg_prev->off);
1647                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1648                         (pg->flag & OBD_BRW_SRVLOCK));
1649                 if (short_io_size != 0 && opc == OST_WRITE) {
1650                         unsigned char *ptr = kmap_atomic(pg->pg);
1651
1652                         LASSERT(short_io_size >= requested_nob + pg->count);
1653                         memcpy(short_io_buf + requested_nob,
1654                                ptr + poff,
1655                                pg->count);
1656                         kunmap_atomic(ptr);
1657                 } else if (short_io_size == 0) {
1658                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1659                                                          pg->count);
1660                 }
1661                 requested_nob += pg->count;
1662
1663                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1664                         niobuf--;
1665                         niobuf->rnb_len += pg->count;
1666                 } else {
1667                         niobuf->rnb_offset = pg->off;
1668                         niobuf->rnb_len    = pg->count;
1669                         niobuf->rnb_flags  = pg->flag;
1670                 }
1671                 pg_prev = pg;
1672         }
1673
1674         LASSERTF((void *)(niobuf - niocount) ==
1675                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1676                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1677                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1678
1679         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1680         if (resend) {
1681                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1682                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1683                         body->oa.o_flags = 0;
1684                 }
1685                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1686         }
1687
1688         if (osc_should_shrink_grant(cli))
1689                 osc_shrink_grant_local(cli, &body->oa);
1690
1691         /* size[REQ_REC_OFF] still sizeof (*body) */
1692         if (opc == OST_WRITE) {
1693                 if (cli->cl_checksum &&
1694                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1695                         /* store cl_cksum_type in a local variable since
1696                          * it can be changed via lprocfs */
1697                         enum cksum_types cksum_type = cli->cl_cksum_type;
1698
1699                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1700                                 body->oa.o_flags = 0;
1701
1702                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1703                                                                 cksum_type);
1704                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1705
1706                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1707                                                   requested_nob, page_count,
1708                                                   pga, OST_WRITE,
1709                                                   &body->oa.o_cksum);
1710                         if (rc < 0) {
1711                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1712                                        rc);
1713                                 GOTO(out, rc);
1714                         }
1715                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1716                                body->oa.o_cksum);
1717
1718                         /* save this in 'oa', too, for later checking */
1719                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1720                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1721                                                            cksum_type);
1722                 } else {
1723                         /* clear out the checksum flag, in case this is a
1724                          * resend but cl_checksum is no longer set. b=11238 */
1725                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1726                 }
1727                 oa->o_cksum = body->oa.o_cksum;
1728                 /* 1 RC per niobuf */
1729                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1730                                      sizeof(__u32) * niocount);
1731         } else {
1732                 if (cli->cl_checksum &&
1733                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1734                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1735                                 body->oa.o_flags = 0;
1736                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1737                                 cli->cl_cksum_type);
1738                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1739                 }
1740
1741                 /* Client cksum has been already copied to wire obdo in previous
1742                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1743                  * resent due to cksum error, this will allow Server to
1744                  * check+dump pages on its side */
1745         }
1746         ptlrpc_request_set_replen(req);
1747
1748         aa = ptlrpc_req_async_args(aa, req);
1749         aa->aa_oa = oa;
1750         aa->aa_requested_nob = requested_nob;
1751         aa->aa_nio_count = niocount;
1752         aa->aa_page_count = page_count;
1753         aa->aa_resends = 0;
1754         aa->aa_ppga = pga;
1755         aa->aa_cli = cli;
1756         INIT_LIST_HEAD(&aa->aa_oaps);
1757
1758         *reqp = req;
1759         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1760         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1761                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1762                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1763         RETURN(0);
1764
1765  out:
1766         ptlrpc_req_finished(req);
1767         RETURN(rc);
1768 }
1769
1770 char dbgcksum_file_name[PATH_MAX];
1771
1772 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1773                                 struct brw_page **pga, __u32 server_cksum,
1774                                 __u32 client_cksum)
1775 {
1776         struct file *filp;
1777         int rc, i;
1778         unsigned int len;
1779         char *buf;
1780
1781         /* will only keep dump of pages on first error for the same range in
1782          * file/fid, not during the resends/retries. */
1783         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1784                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1785                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1786                   libcfs_debug_file_path_arr :
1787                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1788                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1789                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1790                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1791                  pga[0]->off,
1792                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1793                  client_cksum, server_cksum);
1794         filp = filp_open(dbgcksum_file_name,
1795                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1796         if (IS_ERR(filp)) {
1797                 rc = PTR_ERR(filp);
1798                 if (rc == -EEXIST)
1799                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1800                                "checksum error: rc = %d\n", dbgcksum_file_name,
1801                                rc);
1802                 else
1803                         CERROR("%s: can't open to dump pages with checksum "
1804                                "error: rc = %d\n", dbgcksum_file_name, rc);
1805                 return;
1806         }
1807
1808         for (i = 0; i < page_count; i++) {
1809                 len = pga[i]->count;
1810                 buf = kmap(pga[i]->pg);
1811                 while (len != 0) {
1812                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1813                         if (rc < 0) {
1814                                 CERROR("%s: wanted to write %u but got %d "
1815                                        "error\n", dbgcksum_file_name, len, rc);
1816                                 break;
1817                         }
1818                         len -= rc;
1819                         buf += rc;
1820                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1821                                dbgcksum_file_name, rc);
1822                 }
1823                 kunmap(pga[i]->pg);
1824         }
1825
1826         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1827         if (rc)
1828                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1829         filp_close(filp, NULL);
1830 }
1831
1832 static int
1833 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1834                      __u32 client_cksum, __u32 server_cksum,
1835                      struct osc_brw_async_args *aa)
1836 {
1837         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1838         enum cksum_types cksum_type;
1839         obd_dif_csum_fn *fn = NULL;
1840         int sector_size = 0;
1841         __u32 new_cksum;
1842         char *msg;
1843         int rc;
1844
1845         if (server_cksum == client_cksum) {
1846                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1847                 return 0;
1848         }
1849
1850         if (aa->aa_cli->cl_checksum_dump)
1851                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1852                                     server_cksum, client_cksum);
1853
1854         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1855                                            oa->o_flags : 0);
1856
1857         switch (cksum_type) {
1858         case OBD_CKSUM_T10IP512:
1859                 fn = obd_dif_ip_fn;
1860                 sector_size = 512;
1861                 break;
1862         case OBD_CKSUM_T10IP4K:
1863                 fn = obd_dif_ip_fn;
1864                 sector_size = 4096;
1865                 break;
1866         case OBD_CKSUM_T10CRC512:
1867                 fn = obd_dif_crc_fn;
1868                 sector_size = 512;
1869                 break;
1870         case OBD_CKSUM_T10CRC4K:
1871                 fn = obd_dif_crc_fn;
1872                 sector_size = 4096;
1873                 break;
1874         default:
1875                 break;
1876         }
1877
1878         if (fn)
1879                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1880                                              aa->aa_page_count, aa->aa_ppga,
1881                                              OST_WRITE, fn, sector_size,
1882                                              &new_cksum);
1883         else
1884                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1885                                        aa->aa_ppga, OST_WRITE, cksum_type,
1886                                        &new_cksum);
1887
1888         if (rc < 0)
1889                 msg = "failed to calculate the client write checksum";
1890         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1891                 msg = "the server did not use the checksum type specified in "
1892                       "the original request - likely a protocol problem";
1893         else if (new_cksum == server_cksum)
1894                 msg = "changed on the client after we checksummed it - "
1895                       "likely false positive due to mmap IO (bug 11742)";
1896         else if (new_cksum == client_cksum)
1897                 msg = "changed in transit before arrival at OST";
1898         else
1899                 msg = "changed in transit AND doesn't match the original - "
1900                       "likely false positive due to mmap IO (bug 11742)";
1901
1902         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1903                            DFID " object "DOSTID" extent [%llu-%llu], original "
1904                            "client csum %x (type %x), server csum %x (type %x),"
1905                            " client csum now %x\n",
1906                            obd_name, msg, libcfs_nid2str(peer->nid),
1907                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1908                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1909                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1910                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1911                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1912                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1913                            client_cksum,
1914                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1915                            server_cksum, cksum_type, new_cksum);
1916         return 1;
1917 }
1918
1919 /* Note rc enters this function as number of bytes transferred */
1920 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1921 {
1922         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1923         struct client_obd *cli = aa->aa_cli;
1924         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1925         const struct lnet_process_id *peer =
1926                 &req->rq_import->imp_connection->c_peer;
1927         struct ost_body *body;
1928         u32 client_cksum = 0;
1929         struct inode *inode;
1930         unsigned int blockbits = 0, blocksize = 0;
1931
1932         ENTRY;
1933
1934         if (rc < 0 && rc != -EDQUOT) {
1935                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1936                 RETURN(rc);
1937         }
1938
1939         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1940         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1941         if (body == NULL) {
1942                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1943                 RETURN(-EPROTO);
1944         }
1945
1946         /* set/clear over quota flag for a uid/gid/projid */
1947         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1948             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1949                 unsigned qid[LL_MAXQUOTAS] = {
1950                                          body->oa.o_uid, body->oa.o_gid,
1951                                          body->oa.o_projid };
1952                 CDEBUG(D_QUOTA,
1953                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1954                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1955                        body->oa.o_valid, body->oa.o_flags);
1956                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1957                                        body->oa.o_flags);
1958         }
1959
1960         osc_update_grant(cli, body);
1961
1962         if (rc < 0)
1963                 RETURN(rc);
1964
1965         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1966                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1967
1968         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1969                 if (rc > 0) {
1970                         CERROR("%s: unexpected positive size %d\n",
1971                                obd_name, rc);
1972                         RETURN(-EPROTO);
1973                 }
1974
1975                 if (req->rq_bulk != NULL &&
1976                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1977                         RETURN(-EAGAIN);
1978
1979                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1980                     check_write_checksum(&body->oa, peer, client_cksum,
1981                                          body->oa.o_cksum, aa))
1982                         RETURN(-EAGAIN);
1983
1984                 rc = check_write_rcs(req, aa->aa_requested_nob,
1985                                      aa->aa_nio_count, aa->aa_page_count,
1986                                      aa->aa_ppga);
1987                 GOTO(out, rc);
1988         }
1989
1990         /* The rest of this function executes only for OST_READs */
1991
1992         if (req->rq_bulk == NULL) {
1993                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1994                                           RCL_SERVER);
1995                 LASSERT(rc == req->rq_status);
1996         } else {
1997                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1998                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1999         }
2000         if (rc < 0)
2001                 GOTO(out, rc = -EAGAIN);
2002
2003         if (rc > aa->aa_requested_nob) {
2004                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2005                        rc, aa->aa_requested_nob);
2006                 RETURN(-EPROTO);
2007         }
2008
2009         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2010                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2011                        rc, req->rq_bulk->bd_nob_transferred);
2012                 RETURN(-EPROTO);
2013         }
2014
2015         if (req->rq_bulk == NULL) {
2016                 /* short io */
2017                 int nob, pg_count, i = 0;
2018                 unsigned char *buf;
2019
2020                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2021                 pg_count = aa->aa_page_count;
2022                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2023                                                    rc);
2024                 nob = rc;
2025                 while (nob > 0 && pg_count > 0) {
2026                         unsigned char *ptr;
2027                         int count = aa->aa_ppga[i]->count > nob ?
2028                                     nob : aa->aa_ppga[i]->count;
2029
2030                         CDEBUG(D_CACHE, "page %p count %d\n",
2031                                aa->aa_ppga[i]->pg, count);
2032                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2033                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2034                                count);
2035                         kunmap_atomic((void *) ptr);
2036
2037                         buf += count;
2038                         nob -= count;
2039                         i++;
2040                         pg_count--;
2041                 }
2042         }
2043
2044         if (rc < aa->aa_requested_nob)
2045                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2046
2047         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2048                 static int cksum_counter;
2049                 u32        server_cksum = body->oa.o_cksum;
2050                 char      *via = "";
2051                 char      *router = "";
2052                 enum cksum_types cksum_type;
2053                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2054                         body->oa.o_flags : 0;
2055
2056                 cksum_type = obd_cksum_type_unpack(o_flags);
2057                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2058                                           aa->aa_page_count, aa->aa_ppga,
2059                                           OST_READ, &client_cksum);
2060                 if (rc < 0)
2061                         GOTO(out, rc);
2062
2063                 if (req->rq_bulk != NULL &&
2064                     peer->nid != req->rq_bulk->bd_sender) {
2065                         via = " via ";
2066                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2067                 }
2068
2069                 if (server_cksum != client_cksum) {
2070                         struct ost_body *clbody;
2071                         u32 page_count = aa->aa_page_count;
2072
2073                         clbody = req_capsule_client_get(&req->rq_pill,
2074                                                         &RMF_OST_BODY);
2075                         if (cli->cl_checksum_dump)
2076                                 dump_all_bulk_pages(&clbody->oa, page_count,
2077                                                     aa->aa_ppga, server_cksum,
2078                                                     client_cksum);
2079
2080                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2081                                            "%s%s%s inode "DFID" object "DOSTID
2082                                            " extent [%llu-%llu], client %x, "
2083                                            "server %x, cksum_type %x\n",
2084                                            obd_name,
2085                                            libcfs_nid2str(peer->nid),
2086                                            via, router,
2087                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2088                                                 clbody->oa.o_parent_seq : 0ULL,
2089                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2090                                                 clbody->oa.o_parent_oid : 0,
2091                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2092                                                 clbody->oa.o_parent_ver : 0,
2093                                            POSTID(&body->oa.o_oi),
2094                                            aa->aa_ppga[0]->off,
2095                                            aa->aa_ppga[page_count-1]->off +
2096                                            aa->aa_ppga[page_count-1]->count - 1,
2097                                            client_cksum, server_cksum,
2098                                            cksum_type);
2099                         cksum_counter = 0;
2100                         aa->aa_oa->o_cksum = client_cksum;
2101                         rc = -EAGAIN;
2102                 } else {
2103                         cksum_counter++;
2104                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2105                         rc = 0;
2106                 }
2107         } else if (unlikely(client_cksum)) {
2108                 static int cksum_missed;
2109
2110                 cksum_missed++;
2111                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2112                         CERROR("%s: checksum %u requested from %s but not sent\n",
2113                                obd_name, cksum_missed,
2114                                libcfs_nid2str(peer->nid));
2115         } else {
2116                 rc = 0;
2117         }
2118
2119         inode = page2inode(aa->aa_ppga[0]->pg);
2120         if (inode == NULL) {
2121                 /* Try to get reference to inode from cl_page if we are
2122                  * dealing with direct IO, as handled pages are not
2123                  * actual page cache pages.
2124                  */
2125                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2126
2127                 inode = oap2cl_page(oap)->cp_inode;
2128                 if (inode) {
2129                         blockbits = inode->i_blkbits;
2130                         blocksize = 1 << blockbits;
2131                 }
2132         }
2133         if (inode && IS_ENCRYPTED(inode)) {
2134                 int idx;
2135
2136                 if (!llcrypt_has_encryption_key(inode)) {
2137                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2138                         GOTO(out, rc);
2139                 }
2140                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2141                         struct brw_page *pg = aa->aa_ppga[idx];
2142                         unsigned int offs = 0;
2143
2144                         while (offs < PAGE_SIZE) {
2145                                 /* do not decrypt if page is all 0s */
2146                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2147                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2148                                         /* if page is empty forward info to
2149                                          * upper layers (ll_io_zero_page) by
2150                                          * clearing PagePrivate2
2151                                          */
2152                                         if (!offs)
2153                                                 ClearPagePrivate2(pg->pg);
2154                                         break;
2155                                 }
2156
2157                                 if (blockbits) {
2158                                         /* This is direct IO case. Directly call
2159                                          * decrypt function that takes inode as
2160                                          * input parameter. Page does not need
2161                                          * to be locked.
2162                                          */
2163                                         u64 lblk_num =
2164                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2165                                                      (PAGE_SHIFT - blockbits)) +
2166                                                        (offs >> blockbits);
2167                                         unsigned int i;
2168
2169                                         for (i = offs;
2170                                              i < offs +
2171                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2172                                              i += blocksize, lblk_num++) {
2173                                                 rc =
2174                                                   llcrypt_decrypt_block_inplace(
2175                                                           inode, pg->pg,
2176                                                           blocksize, i,
2177                                                           lblk_num);
2178                                                 if (rc)
2179                                                         break;
2180                                         }
2181                                 } else {
2182                                         rc = llcrypt_decrypt_pagecache_blocks(
2183                                                 pg->pg,
2184                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2185                                                 offs);
2186                                 }
2187                                 if (rc)
2188                                         GOTO(out, rc);
2189
2190                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2191                         }
2192                 }
2193         }
2194
2195 out:
2196         if (rc >= 0)
2197                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2198                                      aa->aa_oa, &body->oa);
2199
2200         RETURN(rc);
2201 }
2202
2203 static int osc_brw_redo_request(struct ptlrpc_request *request,
2204                                 struct osc_brw_async_args *aa, int rc)
2205 {
2206         struct ptlrpc_request *new_req;
2207         struct osc_brw_async_args *new_aa;
2208         struct osc_async_page *oap;
2209         ENTRY;
2210
2211         /* The below message is checked in replay-ost-single.sh test_8ae*/
2212         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2213                   "redo for recoverable error %d", rc);
2214
2215         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2216                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2217                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2218                                   aa->aa_ppga, &new_req, 1);
2219         if (rc)
2220                 RETURN(rc);
2221
2222         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2223                 if (oap->oap_request != NULL) {
2224                         LASSERTF(request == oap->oap_request,
2225                                  "request %p != oap_request %p\n",
2226                                  request, oap->oap_request);
2227                 }
2228         }
2229         /*
2230          * New request takes over pga and oaps from old request.
2231          * Note that copying a list_head doesn't work, need to move it...
2232          */
2233         aa->aa_resends++;
2234         new_req->rq_interpret_reply = request->rq_interpret_reply;
2235         new_req->rq_async_args = request->rq_async_args;
2236         new_req->rq_commit_cb = request->rq_commit_cb;
2237         /* cap resend delay to the current request timeout, this is similar to
2238          * what ptlrpc does (see after_reply()) */
2239         if (aa->aa_resends > new_req->rq_timeout)
2240                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2241         else
2242                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2243         new_req->rq_generation_set = 1;
2244         new_req->rq_import_generation = request->rq_import_generation;
2245
2246         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2247
2248         INIT_LIST_HEAD(&new_aa->aa_oaps);
2249         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2250         INIT_LIST_HEAD(&new_aa->aa_exts);
2251         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2252         new_aa->aa_resends = aa->aa_resends;
2253
2254         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2255                 if (oap->oap_request) {
2256                         ptlrpc_req_finished(oap->oap_request);
2257                         oap->oap_request = ptlrpc_request_addref(new_req);
2258                 }
2259         }
2260
2261         /* XXX: This code will run into problem if we're going to support
2262          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2263          * and wait for all of them to be finished. We should inherit request
2264          * set from old request. */
2265         ptlrpcd_add_req(new_req);
2266
2267         DEBUG_REQ(D_INFO, new_req, "new request");
2268         RETURN(0);
2269 }
2270
2271 /*
2272  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2273  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2274  * fine for our small page arrays and doesn't require allocation.  its an
2275  * insertion sort that swaps elements that are strides apart, shrinking the
2276  * stride down until its '1' and the array is sorted.
2277  */
2278 static void sort_brw_pages(struct brw_page **array, int num)
2279 {
2280         int stride, i, j;
2281         struct brw_page *tmp;
2282
2283         if (num == 1)
2284                 return;
2285         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2286                 ;
2287
2288         do {
2289                 stride /= 3;
2290                 for (i = stride ; i < num ; i++) {
2291                         tmp = array[i];
2292                         j = i;
2293                         while (j >= stride && array[j - stride]->off > tmp->off) {
2294                                 array[j] = array[j - stride];
2295                                 j -= stride;
2296                         }
2297                         array[j] = tmp;
2298                 }
2299         } while (stride > 1);
2300 }
2301
2302 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2303 {
2304         LASSERT(ppga != NULL);
2305         OBD_FREE_PTR_ARRAY(ppga, count);
2306 }
2307
2308 static int brw_interpret(const struct lu_env *env,
2309                          struct ptlrpc_request *req, void *args, int rc)
2310 {
2311         struct osc_brw_async_args *aa = args;
2312         struct osc_extent *ext;
2313         struct osc_extent *tmp;
2314         struct client_obd *cli = aa->aa_cli;
2315         unsigned long transferred = 0;
2316
2317         ENTRY;
2318
2319         rc = osc_brw_fini_request(req, rc);
2320         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2321
2322         /* restore clear text pages */
2323         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2324
2325         /*
2326          * When server returns -EINPROGRESS, client should always retry
2327          * regardless of the number of times the bulk was resent already.
2328          */
2329         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2330                 if (req->rq_import_generation !=
2331                     req->rq_import->imp_generation) {
2332                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2333                                ""DOSTID", rc = %d.\n",
2334                                req->rq_import->imp_obd->obd_name,
2335                                POSTID(&aa->aa_oa->o_oi), rc);
2336                 } else if (rc == -EINPROGRESS ||
2337                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2338                         rc = osc_brw_redo_request(req, aa, rc);
2339                 } else {
2340                         CERROR("%s: too many resent retries for object: "
2341                                "%llu:%llu, rc = %d.\n",
2342                                req->rq_import->imp_obd->obd_name,
2343                                POSTID(&aa->aa_oa->o_oi), rc);
2344                 }
2345
2346                 if (rc == 0)
2347                         RETURN(0);
2348                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2349                         rc = -EIO;
2350         }
2351
2352         if (rc == 0) {
2353                 struct obdo *oa = aa->aa_oa;
2354                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2355                 unsigned long valid = 0;
2356                 struct cl_object *obj;
2357                 struct osc_async_page *last;
2358
2359                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2360                 obj = osc2cl(last->oap_obj);
2361
2362                 cl_object_attr_lock(obj);
2363                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2364                         attr->cat_blocks = oa->o_blocks;
2365                         valid |= CAT_BLOCKS;
2366                 }
2367                 if (oa->o_valid & OBD_MD_FLMTIME) {
2368                         attr->cat_mtime = oa->o_mtime;
2369                         valid |= CAT_MTIME;
2370                 }
2371                 if (oa->o_valid & OBD_MD_FLATIME) {
2372                         attr->cat_atime = oa->o_atime;
2373                         valid |= CAT_ATIME;
2374                 }
2375                 if (oa->o_valid & OBD_MD_FLCTIME) {
2376                         attr->cat_ctime = oa->o_ctime;
2377                         valid |= CAT_CTIME;
2378                 }
2379
2380                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2381                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2382                         loff_t last_off = last->oap_count + last->oap_obj_off +
2383                                 last->oap_page_off;
2384
2385                         /* Change file size if this is an out of quota or
2386                          * direct IO write and it extends the file size */
2387                         if (loi->loi_lvb.lvb_size < last_off) {
2388                                 attr->cat_size = last_off;
2389                                 valid |= CAT_SIZE;
2390                         }
2391                         /* Extend KMS if it's not a lockless write */
2392                         if (loi->loi_kms < last_off &&
2393                             oap2osc_page(last)->ops_srvlock == 0) {
2394                                 attr->cat_kms = last_off;
2395                                 valid |= CAT_KMS;
2396                         }
2397                 }
2398
2399                 if (valid != 0)
2400                         cl_object_attr_update(env, obj, attr, valid);
2401                 cl_object_attr_unlock(obj);
2402         }
2403         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2404         aa->aa_oa = NULL;
2405
2406         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2407                 osc_inc_unstable_pages(req);
2408
2409         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2410                 list_del_init(&ext->oe_link);
2411                 osc_extent_finish(env, ext, 1,
2412                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2413         }
2414         LASSERT(list_empty(&aa->aa_exts));
2415         LASSERT(list_empty(&aa->aa_oaps));
2416
2417         transferred = (req->rq_bulk == NULL ? /* short io */
2418                        aa->aa_requested_nob :
2419                        req->rq_bulk->bd_nob_transferred);
2420
2421         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2422         ptlrpc_lprocfs_brw(req, transferred);
2423
2424         spin_lock(&cli->cl_loi_list_lock);
2425         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2426          * is called so we know whether to go to sync BRWs or wait for more
2427          * RPCs to complete */
2428         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2429                 cli->cl_w_in_flight--;
2430         else
2431                 cli->cl_r_in_flight--;
2432         osc_wake_cache_waiters(cli);
2433         spin_unlock(&cli->cl_loi_list_lock);
2434
2435         osc_io_unplug(env, cli, NULL);
2436         RETURN(rc);
2437 }
2438
2439 static void brw_commit(struct ptlrpc_request *req)
2440 {
2441         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2442          * this called via the rq_commit_cb, I need to ensure
2443          * osc_dec_unstable_pages is still called. Otherwise unstable
2444          * pages may be leaked. */
2445         spin_lock(&req->rq_lock);
2446         if (likely(req->rq_unstable)) {
2447                 req->rq_unstable = 0;
2448                 spin_unlock(&req->rq_lock);
2449
2450                 osc_dec_unstable_pages(req);
2451         } else {
2452                 req->rq_committed = 1;
2453                 spin_unlock(&req->rq_lock);
2454         }
2455 }
2456
2457 /**
2458  * Build an RPC by the list of extent @ext_list. The caller must ensure
2459  * that the total pages in this list are NOT over max pages per RPC.
2460  * Extents in the list must be in OES_RPC state.
2461  */
2462 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2463                   struct list_head *ext_list, int cmd)
2464 {
2465         struct ptlrpc_request           *req = NULL;
2466         struct osc_extent               *ext;
2467         struct brw_page                 **pga = NULL;
2468         struct osc_brw_async_args       *aa = NULL;
2469         struct obdo                     *oa = NULL;
2470         struct osc_async_page           *oap;
2471         struct osc_object               *obj = NULL;
2472         struct cl_req_attr              *crattr = NULL;
2473         loff_t                          starting_offset = OBD_OBJECT_EOF;
2474         loff_t                          ending_offset = 0;
2475         /* '1' for consistency with code that checks !mpflag to restore */
2476         int mpflag = 1;
2477         int                             mem_tight = 0;
2478         int                             page_count = 0;
2479         bool                            soft_sync = false;
2480         bool                            ndelay = false;
2481         int                             i;
2482         int                             grant = 0;
2483         int                             rc;
2484         __u32                           layout_version = 0;
2485         LIST_HEAD(rpc_list);
2486         struct ost_body                 *body;
2487         ENTRY;
2488         LASSERT(!list_empty(ext_list));
2489
2490         /* add pages into rpc_list to build BRW rpc */
2491         list_for_each_entry(ext, ext_list, oe_link) {
2492                 LASSERT(ext->oe_state == OES_RPC);
2493                 mem_tight |= ext->oe_memalloc;
2494                 grant += ext->oe_grants;
2495                 page_count += ext->oe_nr_pages;
2496                 layout_version = max(layout_version, ext->oe_layout_version);
2497                 if (obj == NULL)
2498                         obj = ext->oe_obj;
2499         }
2500
2501         soft_sync = osc_over_unstable_soft_limit(cli);
2502         if (mem_tight)
2503                 mpflag = memalloc_noreclaim_save();
2504
2505         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2506         if (pga == NULL)
2507                 GOTO(out, rc = -ENOMEM);
2508
2509         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2510         if (oa == NULL)
2511                 GOTO(out, rc = -ENOMEM);
2512
2513         i = 0;
2514         list_for_each_entry(ext, ext_list, oe_link) {
2515                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2516                         if (mem_tight)
2517                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2518                         if (soft_sync)
2519                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2520                         pga[i] = &oap->oap_brw_page;
2521                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2522                         i++;
2523
2524                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2525                         if (starting_offset == OBD_OBJECT_EOF ||
2526                             starting_offset > oap->oap_obj_off)
2527                                 starting_offset = oap->oap_obj_off;
2528                         else
2529                                 LASSERT(oap->oap_page_off == 0);
2530                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2531                                 ending_offset = oap->oap_obj_off +
2532                                                 oap->oap_count;
2533                         else
2534                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2535                                         PAGE_SIZE);
2536                 }
2537                 if (ext->oe_ndelay)
2538                         ndelay = true;
2539         }
2540
2541         /* first page in the list */
2542         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2543
2544         crattr = &osc_env_info(env)->oti_req_attr;
2545         memset(crattr, 0, sizeof(*crattr));
2546         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2547         crattr->cra_flags = ~0ULL;
2548         crattr->cra_page = oap2cl_page(oap);
2549         crattr->cra_oa = oa;
2550         cl_req_attr_set(env, osc2cl(obj), crattr);
2551
2552         if (cmd == OBD_BRW_WRITE) {
2553                 oa->o_grant_used = grant;
2554                 if (layout_version > 0) {
2555                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2556                                PFID(&oa->o_oi.oi_fid), layout_version);
2557
2558                         oa->o_layout_version = layout_version;
2559                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2560                 }
2561         }
2562
2563         sort_brw_pages(pga, page_count);
2564         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2565         if (rc != 0) {
2566                 CERROR("prep_req failed: %d\n", rc);
2567                 GOTO(out, rc);
2568         }
2569
2570         req->rq_commit_cb = brw_commit;
2571         req->rq_interpret_reply = brw_interpret;
2572         req->rq_memalloc = mem_tight != 0;
2573         oap->oap_request = ptlrpc_request_addref(req);
2574         if (ndelay) {
2575                 req->rq_no_resend = req->rq_no_delay = 1;
2576                 /* probably set a shorter timeout value.
2577                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2578                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2579         }
2580
2581         /* Need to update the timestamps after the request is built in case
2582          * we race with setattr (locally or in queue at OST).  If OST gets
2583          * later setattr before earlier BRW (as determined by the request xid),
2584          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2585          * way to do this in a single call.  bug 10150 */
2586         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2587         crattr->cra_oa = &body->oa;
2588         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2589         cl_req_attr_set(env, osc2cl(obj), crattr);
2590         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2591
2592         aa = ptlrpc_req_async_args(aa, req);
2593         INIT_LIST_HEAD(&aa->aa_oaps);
2594         list_splice_init(&rpc_list, &aa->aa_oaps);
2595         INIT_LIST_HEAD(&aa->aa_exts);
2596         list_splice_init(ext_list, &aa->aa_exts);
2597
2598         spin_lock(&cli->cl_loi_list_lock);
2599         starting_offset >>= PAGE_SHIFT;
2600         if (cmd == OBD_BRW_READ) {
2601                 cli->cl_r_in_flight++;
2602                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2603                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2604                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2605                                       starting_offset + 1);
2606         } else {
2607                 cli->cl_w_in_flight++;
2608                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2609                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2610                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2611                                       starting_offset + 1);
2612         }
2613         spin_unlock(&cli->cl_loi_list_lock);
2614
2615         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2616                   page_count, aa, cli->cl_r_in_flight,
2617                   cli->cl_w_in_flight);
2618         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2619
2620         ptlrpcd_add_req(req);
2621         rc = 0;
2622         EXIT;
2623
2624 out:
2625         if (mem_tight)
2626                 memalloc_noreclaim_restore(mpflag);
2627
2628         if (rc != 0) {
2629                 LASSERT(req == NULL);
2630
2631                 if (oa)
2632                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2633                 if (pga) {
2634                         osc_release_bounce_pages(pga, page_count);
2635                         osc_release_ppga(pga, page_count);
2636                 }
2637                 /* this should happen rarely and is pretty bad, it makes the
2638                  * pending list not follow the dirty order */
2639                 while (!list_empty(ext_list)) {
2640                         ext = list_entry(ext_list->next, struct osc_extent,
2641                                          oe_link);
2642                         list_del_init(&ext->oe_link);
2643                         osc_extent_finish(env, ext, 0, rc);
2644                 }
2645         }
2646         RETURN(rc);
2647 }
2648
2649 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2650 {
2651         int set = 0;
2652
2653         LASSERT(lock != NULL);
2654
2655         lock_res_and_lock(lock);
2656
2657         if (lock->l_ast_data == NULL)
2658                 lock->l_ast_data = data;
2659         if (lock->l_ast_data == data)
2660                 set = 1;
2661
2662         unlock_res_and_lock(lock);
2663
2664         return set;
2665 }
2666
2667 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2668                      void *cookie, struct lustre_handle *lockh,
2669                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2670                      int errcode)
2671 {
2672         bool intent = *flags & LDLM_FL_HAS_INTENT;
2673         int rc;
2674         ENTRY;
2675
2676         /* The request was created before ldlm_cli_enqueue call. */
2677         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2678                 struct ldlm_reply *rep;
2679
2680                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2681                 LASSERT(rep != NULL);
2682
2683                 rep->lock_policy_res1 =
2684                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2685                 if (rep->lock_policy_res1)
2686                         errcode = rep->lock_policy_res1;
2687                 if (!speculative)
2688                         *flags |= LDLM_FL_LVB_READY;
2689         } else if (errcode == ELDLM_OK) {
2690                 *flags |= LDLM_FL_LVB_READY;
2691         }
2692
2693         /* Call the update callback. */
2694         rc = (*upcall)(cookie, lockh, errcode);
2695
2696         /* release the reference taken in ldlm_cli_enqueue() */
2697         if (errcode == ELDLM_LOCK_MATCHED)
2698                 errcode = ELDLM_OK;
2699         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2700                 ldlm_lock_decref(lockh, mode);
2701
2702         RETURN(rc);
2703 }
2704
2705 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2706                           void *args, int rc)
2707 {
2708         struct osc_enqueue_args *aa = args;
2709         struct ldlm_lock *lock;
2710         struct lustre_handle *lockh = &aa->oa_lockh;
2711         enum ldlm_mode mode = aa->oa_mode;
2712         struct ost_lvb *lvb = aa->oa_lvb;
2713         __u32 lvb_len = sizeof(*lvb);
2714         __u64 flags = 0;
2715         struct ldlm_enqueue_info einfo = {
2716                 .ei_type = aa->oa_type,
2717                 .ei_mode = mode,
2718         };
2719
2720         ENTRY;
2721
2722         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2723          * be valid. */
2724         lock = ldlm_handle2lock(lockh);
2725         LASSERTF(lock != NULL,
2726                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2727                  lockh->cookie, req, aa);
2728
2729         /* Take an additional reference so that a blocking AST that
2730          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2731          * to arrive after an upcall has been executed by
2732          * osc_enqueue_fini(). */
2733         ldlm_lock_addref(lockh, mode);
2734
2735         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2736         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2737
2738         /* Let CP AST to grant the lock first. */
2739         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2740
2741         if (aa->oa_speculative) {
2742                 LASSERT(aa->oa_lvb == NULL);
2743                 LASSERT(aa->oa_flags == NULL);
2744                 aa->oa_flags = &flags;
2745         }
2746
2747         /* Complete obtaining the lock procedure. */
2748         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2749                                    lvb, lvb_len, lockh, rc);
2750         /* Complete osc stuff. */
2751         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2752                               aa->oa_flags, aa->oa_speculative, rc);
2753
2754         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2755
2756         ldlm_lock_decref(lockh, mode);
2757         LDLM_LOCK_PUT(lock);
2758         RETURN(rc);
2759 }
2760
2761 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2762  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2763  * other synchronous requests, however keeping some locks and trying to obtain
2764  * others may take a considerable amount of time in a case of ost failure; and
2765  * when other sync requests do not get released lock from a client, the client
2766  * is evicted from the cluster -- such scenarious make the life difficult, so
2767  * release locks just after they are obtained. */
2768 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2769                      __u64 *flags, union ldlm_policy_data *policy,
2770                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2771                      void *cookie, struct ldlm_enqueue_info *einfo,
2772                      struct ptlrpc_request_set *rqset, int async,
2773                      bool speculative)
2774 {
2775         struct obd_device *obd = exp->exp_obd;
2776         struct lustre_handle lockh = { 0 };
2777         struct ptlrpc_request *req = NULL;
2778         int intent = *flags & LDLM_FL_HAS_INTENT;
2779         __u64 match_flags = *flags;
2780         enum ldlm_mode mode;
2781         int rc;
2782         ENTRY;
2783
2784         /* Filesystem lock extents are extended to page boundaries so that
2785          * dealing with the page cache is a little smoother.  */
2786         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2787         policy->l_extent.end |= ~PAGE_MASK;
2788
2789         /* Next, search for already existing extent locks that will cover us */
2790         /* If we're trying to read, we also search for an existing PW lock.  The
2791          * VFS and page cache already protect us locally, so lots of readers/
2792          * writers can share a single PW lock.
2793          *
2794          * There are problems with conversion deadlocks, so instead of
2795          * converting a read lock to a write lock, we'll just enqueue a new
2796          * one.
2797          *
2798          * At some point we should cancel the read lock instead of making them
2799          * send us a blocking callback, but there are problems with canceling
2800          * locks out from other users right now, too. */
2801         mode = einfo->ei_mode;
2802         if (einfo->ei_mode == LCK_PR)
2803                 mode |= LCK_PW;
2804         /* Normal lock requests must wait for the LVB to be ready before
2805          * matching a lock; speculative lock requests do not need to,
2806          * because they will not actually use the lock. */
2807         if (!speculative)
2808                 match_flags |= LDLM_FL_LVB_READY;
2809         if (intent != 0)
2810                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2811         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2812                                einfo->ei_type, policy, mode, &lockh);
2813         if (mode) {
2814                 struct ldlm_lock *matched;
2815
2816                 if (*flags & LDLM_FL_TEST_LOCK)
2817                         RETURN(ELDLM_OK);
2818
2819                 matched = ldlm_handle2lock(&lockh);
2820                 if (speculative) {
2821                         /* This DLM lock request is speculative, and does not
2822                          * have an associated IO request. Therefore if there
2823                          * is already a DLM lock, it wll just inform the
2824                          * caller to cancel the request for this stripe.*/
2825                         lock_res_and_lock(matched);
2826                         if (ldlm_extent_equal(&policy->l_extent,
2827                             &matched->l_policy_data.l_extent))
2828                                 rc = -EEXIST;
2829                         else
2830                                 rc = -ECANCELED;
2831                         unlock_res_and_lock(matched);
2832
2833                         ldlm_lock_decref(&lockh, mode);
2834                         LDLM_LOCK_PUT(matched);
2835                         RETURN(rc);
2836                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2837                         *flags |= LDLM_FL_LVB_READY;
2838
2839                         /* We already have a lock, and it's referenced. */
2840                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2841
2842                         ldlm_lock_decref(&lockh, mode);
2843                         LDLM_LOCK_PUT(matched);
2844                         RETURN(ELDLM_OK);
2845                 } else {
2846                         ldlm_lock_decref(&lockh, mode);
2847                         LDLM_LOCK_PUT(matched);
2848                 }
2849         }
2850
2851         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2852                 RETURN(-ENOLCK);
2853
2854         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2855         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2856
2857         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2858                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2859         if (async) {
2860                 if (!rc) {
2861                         struct osc_enqueue_args *aa;
2862                         aa = ptlrpc_req_async_args(aa, req);
2863                         aa->oa_exp         = exp;
2864                         aa->oa_mode        = einfo->ei_mode;
2865                         aa->oa_type        = einfo->ei_type;
2866                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2867                         aa->oa_upcall      = upcall;
2868                         aa->oa_cookie      = cookie;
2869                         aa->oa_speculative = speculative;
2870                         if (!speculative) {
2871                                 aa->oa_flags  = flags;
2872                                 aa->oa_lvb    = lvb;
2873                         } else {
2874                                 /* speculative locks are essentially to enqueue
2875                                  * a DLM lock  in advance, so we don't care
2876                                  * about the result of the enqueue. */
2877                                 aa->oa_lvb    = NULL;
2878                                 aa->oa_flags  = NULL;
2879                         }
2880
2881                         req->rq_interpret_reply = osc_enqueue_interpret;
2882                         ptlrpc_set_add_req(rqset, req);
2883                 }
2884                 RETURN(rc);
2885         }
2886
2887         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2888                               flags, speculative, rc);
2889
2890         RETURN(rc);
2891 }
2892
2893 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2894                    struct ldlm_res_id *res_id, enum ldlm_type type,
2895                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2896                    __u64 *flags, struct osc_object *obj,
2897                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2898 {
2899         struct obd_device *obd = exp->exp_obd;
2900         __u64 lflags = *flags;
2901         enum ldlm_mode rc;
2902         ENTRY;
2903
2904         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2905                 RETURN(-EIO);
2906
2907         /* Filesystem lock extents are extended to page boundaries so that
2908          * dealing with the page cache is a little smoother */
2909         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2910         policy->l_extent.end |= ~PAGE_MASK;
2911
2912         /* Next, search for already existing extent locks that will cover us */
2913         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2914                                         res_id, type, policy, mode, lockh,
2915                                         match_flags);
2916         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2917                 RETURN(rc);
2918
2919         if (obj != NULL) {
2920                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2921
2922                 LASSERT(lock != NULL);
2923                 if (osc_set_lock_data(lock, obj)) {
2924                         lock_res_and_lock(lock);
2925                         if (!ldlm_is_lvb_cached(lock)) {
2926                                 LASSERT(lock->l_ast_data == obj);
2927                                 osc_lock_lvb_update(env, obj, lock, NULL);
2928                                 ldlm_set_lvb_cached(lock);
2929                         }
2930                         unlock_res_and_lock(lock);
2931                 } else {
2932                         ldlm_lock_decref(lockh, rc);
2933                         rc = 0;
2934                 }
2935                 LDLM_LOCK_PUT(lock);
2936         }
2937         RETURN(rc);
2938 }
2939
2940 static int osc_statfs_interpret(const struct lu_env *env,
2941                                 struct ptlrpc_request *req, void *args, int rc)
2942 {
2943         struct osc_async_args *aa = args;
2944         struct obd_statfs *msfs;
2945
2946         ENTRY;
2947         if (rc == -EBADR)
2948                 /*
2949                  * The request has in fact never been sent due to issues at
2950                  * a higher level (LOV).  Exit immediately since the caller
2951                  * is aware of the problem and takes care of the clean up.
2952                  */
2953                 RETURN(rc);
2954
2955         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2956             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2957                 GOTO(out, rc = 0);
2958
2959         if (rc != 0)
2960                 GOTO(out, rc);
2961
2962         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2963         if (msfs == NULL)
2964                 GOTO(out, rc = -EPROTO);
2965
2966         *aa->aa_oi->oi_osfs = *msfs;
2967 out:
2968         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2969
2970         RETURN(rc);
2971 }
2972
2973 static int osc_statfs_async(struct obd_export *exp,
2974                             struct obd_info *oinfo, time64_t max_age,
2975                             struct ptlrpc_request_set *rqset)
2976 {
2977         struct obd_device     *obd = class_exp2obd(exp);
2978         struct ptlrpc_request *req;
2979         struct osc_async_args *aa;
2980         int rc;
2981         ENTRY;
2982
2983         if (obd->obd_osfs_age >= max_age) {
2984                 CDEBUG(D_SUPER,
2985                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2986                        obd->obd_name, &obd->obd_osfs,
2987                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2988                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2989                 spin_lock(&obd->obd_osfs_lock);
2990                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2991                 spin_unlock(&obd->obd_osfs_lock);
2992                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2993                 if (oinfo->oi_cb_up)
2994                         oinfo->oi_cb_up(oinfo, 0);
2995
2996                 RETURN(0);
2997         }
2998
2999         /* We could possibly pass max_age in the request (as an absolute
3000          * timestamp or a "seconds.usec ago") so the target can avoid doing
3001          * extra calls into the filesystem if that isn't necessary (e.g.
3002          * during mount that would help a bit).  Having relative timestamps
3003          * is not so great if request processing is slow, while absolute
3004          * timestamps are not ideal because they need time synchronization. */
3005         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3006         if (req == NULL)
3007                 RETURN(-ENOMEM);
3008
3009         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3010         if (rc) {
3011                 ptlrpc_request_free(req);
3012                 RETURN(rc);
3013         }
3014         ptlrpc_request_set_replen(req);
3015         req->rq_request_portal = OST_CREATE_PORTAL;
3016         ptlrpc_at_set_req_timeout(req);
3017
3018         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3019                 /* procfs requests not want stat in wait for avoid deadlock */
3020                 req->rq_no_resend = 1;
3021                 req->rq_no_delay = 1;
3022         }
3023
3024         req->rq_interpret_reply = osc_statfs_interpret;
3025         aa = ptlrpc_req_async_args(aa, req);
3026         aa->aa_oi = oinfo;
3027
3028         ptlrpc_set_add_req(rqset, req);
3029         RETURN(0);
3030 }
3031
3032 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3033                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3034 {
3035         struct obd_device     *obd = class_exp2obd(exp);
3036         struct obd_statfs     *msfs;
3037         struct ptlrpc_request *req;
3038         struct obd_import     *imp = NULL;
3039         int rc;
3040         ENTRY;
3041
3042
3043         /*Since the request might also come from lprocfs, so we need
3044          *sync this with client_disconnect_export Bug15684*/
3045         down_read(&obd->u.cli.cl_sem);
3046         if (obd->u.cli.cl_import)
3047                 imp = class_import_get(obd->u.cli.cl_import);
3048         up_read(&obd->u.cli.cl_sem);
3049         if (!imp)
3050                 RETURN(-ENODEV);
3051
3052         /* We could possibly pass max_age in the request (as an absolute
3053          * timestamp or a "seconds.usec ago") so the target can avoid doing
3054          * extra calls into the filesystem if that isn't necessary (e.g.
3055          * during mount that would help a bit).  Having relative timestamps
3056          * is not so great if request processing is slow, while absolute
3057          * timestamps are not ideal because they need time synchronization. */
3058         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3059
3060         class_import_put(imp);
3061
3062         if (req == NULL)
3063                 RETURN(-ENOMEM);
3064
3065         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3066         if (rc) {
3067                 ptlrpc_request_free(req);
3068                 RETURN(rc);
3069         }
3070         ptlrpc_request_set_replen(req);
3071         req->rq_request_portal = OST_CREATE_PORTAL;
3072         ptlrpc_at_set_req_timeout(req);
3073
3074         if (flags & OBD_STATFS_NODELAY) {
3075                 /* procfs requests not want stat in wait for avoid deadlock */
3076                 req->rq_no_resend = 1;
3077                 req->rq_no_delay = 1;
3078         }
3079
3080         rc = ptlrpc_queue_wait(req);
3081         if (rc)
3082                 GOTO(out, rc);
3083
3084         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3085         if (msfs == NULL)
3086                 GOTO(out, rc = -EPROTO);
3087
3088         *osfs = *msfs;
3089
3090         EXIT;
3091 out:
3092         ptlrpc_req_finished(req);
3093         return rc;
3094 }
3095
3096 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3097                          void *karg, void __user *uarg)
3098 {
3099         struct obd_device *obd = exp->exp_obd;
3100         struct obd_ioctl_data *data = karg;
3101         int rc = 0;
3102
3103         ENTRY;
3104         if (!try_module_get(THIS_MODULE)) {
3105                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3106                        module_name(THIS_MODULE));
3107                 return -EINVAL;
3108         }
3109         switch (cmd) {
3110         case OBD_IOC_CLIENT_RECOVER:
3111                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3112                                            data->ioc_inlbuf1, 0);
3113                 if (rc > 0)
3114                         rc = 0;
3115                 break;
3116         case IOC_OSC_SET_ACTIVE:
3117                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3118                                               data->ioc_offset);
3119                 break;
3120         default:
3121                 rc = -ENOTTY;
3122                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3123                        obd->obd_name, cmd, current->comm, rc);
3124                 break;
3125         }
3126
3127         module_put(THIS_MODULE);
3128         return rc;
3129 }
3130
3131 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3132                        u32 keylen, void *key, u32 vallen, void *val,
3133                        struct ptlrpc_request_set *set)
3134 {
3135         struct ptlrpc_request *req;
3136         struct obd_device     *obd = exp->exp_obd;
3137         struct obd_import     *imp = class_exp2cliimp(exp);
3138         char                  *tmp;
3139         int                    rc;
3140         ENTRY;
3141
3142         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3143
3144         if (KEY_IS(KEY_CHECKSUM)) {
3145                 if (vallen != sizeof(int))
3146                         RETURN(-EINVAL);
3147                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3148                 RETURN(0);
3149         }
3150
3151         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3152                 sptlrpc_conf_client_adapt(obd);
3153                 RETURN(0);
3154         }
3155
3156         if (KEY_IS(KEY_FLUSH_CTX)) {
3157                 sptlrpc_import_flush_my_ctx(imp);
3158                 RETURN(0);
3159         }
3160
3161         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3162                 struct client_obd *cli = &obd->u.cli;
3163                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3164                 long target = *(long *)val;
3165
3166                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3167                 *(long *)val -= nr;
3168                 RETURN(0);
3169         }
3170
3171         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3172                 RETURN(-EINVAL);
3173
3174         /* We pass all other commands directly to OST. Since nobody calls osc
3175            methods directly and everybody is supposed to go through LOV, we
3176            assume lov checked invalid values for us.
3177            The only recognised values so far are evict_by_nid and mds_conn.
3178            Even if something bad goes through, we'd get a -EINVAL from OST
3179            anyway. */
3180
3181         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3182                                                 &RQF_OST_SET_GRANT_INFO :
3183                                                 &RQF_OBD_SET_INFO);
3184         if (req == NULL)
3185                 RETURN(-ENOMEM);
3186
3187         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3188                              RCL_CLIENT, keylen);
3189         if (!KEY_IS(KEY_GRANT_SHRINK))
3190                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3191                                      RCL_CLIENT, vallen);
3192         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3193         if (rc) {
3194                 ptlrpc_request_free(req);
3195                 RETURN(rc);
3196         }
3197
3198         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3199         memcpy(tmp, key, keylen);
3200         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3201                                                         &RMF_OST_BODY :
3202                                                         &RMF_SETINFO_VAL);
3203         memcpy(tmp, val, vallen);
3204
3205         if (KEY_IS(KEY_GRANT_SHRINK)) {
3206                 struct osc_grant_args *aa;
3207                 struct obdo *oa;
3208
3209                 aa = ptlrpc_req_async_args(aa, req);
3210                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3211                 if (!oa) {
3212                         ptlrpc_req_finished(req);
3213                         RETURN(-ENOMEM);
3214                 }
3215                 *oa = ((struct ost_body *)val)->oa;
3216                 aa->aa_oa = oa;
3217                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3218         }
3219
3220         ptlrpc_request_set_replen(req);
3221         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3222                 LASSERT(set != NULL);
3223                 ptlrpc_set_add_req(set, req);
3224                 ptlrpc_check_set(NULL, set);
3225         } else {
3226                 ptlrpcd_add_req(req);
3227         }
3228
3229         RETURN(0);
3230 }
3231 EXPORT_SYMBOL(osc_set_info_async);
3232
3233 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3234                   struct obd_device *obd, struct obd_uuid *cluuid,
3235                   struct obd_connect_data *data, void *localdata)
3236 {
3237         struct client_obd *cli = &obd->u.cli;
3238
3239         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3240                 long lost_grant;
3241                 long grant;
3242
3243                 spin_lock(&cli->cl_loi_list_lock);
3244                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3245                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3246                         /* restore ocd_grant_blkbits as client page bits */
3247                         data->ocd_grant_blkbits = PAGE_SHIFT;
3248                         grant += cli->cl_dirty_grant;
3249                 } else {
3250                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3251                 }
3252                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3253                 lost_grant = cli->cl_lost_grant;
3254                 cli->cl_lost_grant = 0;
3255                 spin_unlock(&cli->cl_loi_list_lock);
3256
3257                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3258                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3259                        data->ocd_version, data->ocd_grant, lost_grant);
3260         }
3261
3262         RETURN(0);
3263 }
3264 EXPORT_SYMBOL(osc_reconnect);
3265
3266 int osc_disconnect(struct obd_export *exp)
3267 {
3268         struct obd_device *obd = class_exp2obd(exp);
3269         int rc;
3270
3271         rc = client_disconnect_export(exp);
3272         /**
3273          * Initially we put del_shrink_grant before disconnect_export, but it
3274          * causes the following problem if setup (connect) and cleanup
3275          * (disconnect) are tangled together.
3276          *      connect p1                     disconnect p2
3277          *   ptlrpc_connect_import
3278          *     ...............               class_manual_cleanup
3279          *                                     osc_disconnect
3280          *                                     del_shrink_grant
3281          *   ptlrpc_connect_interrupt
3282          *     osc_init_grant
3283          *   add this client to shrink list
3284          *                                      cleanup_osc
3285          * Bang! grant shrink thread trigger the shrink. BUG18662
3286          */
3287         osc_del_grant_list(&obd->u.cli);
3288         return rc;
3289 }
3290 EXPORT_SYMBOL(osc_disconnect);
3291
3292 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3293                                  struct hlist_node *hnode, void *arg)
3294 {
3295         struct lu_env *env = arg;
3296         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3297         struct ldlm_lock *lock;
3298         struct osc_object *osc = NULL;
3299         ENTRY;
3300
3301         lock_res(res);
3302         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3303                 if (lock->l_ast_data != NULL && osc == NULL) {
3304                         osc = lock->l_ast_data;
3305                         cl_object_get(osc2cl(osc));
3306                 }
3307
3308                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3309                  * by the 2nd round of ldlm_namespace_clean() call in
3310                  * osc_import_event(). */
3311                 ldlm_clear_cleaned(lock);
3312         }
3313         unlock_res(res);
3314
3315         if (osc != NULL) {
3316                 osc_object_invalidate(env, osc);
3317                 cl_object_put(env, osc2cl(osc));
3318         }
3319
3320         RETURN(0);
3321 }
3322 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3323
3324 static int osc_import_event(struct obd_device *obd,
3325                             struct obd_import *imp,
3326                             enum obd_import_event event)
3327 {
3328         struct client_obd *cli;
3329         int rc = 0;
3330
3331         ENTRY;
3332         LASSERT(imp->imp_obd == obd);
3333
3334         switch (event) {
3335         case IMP_EVENT_DISCON: {
3336                 cli = &obd->u.cli;
3337                 spin_lock(&cli->cl_loi_list_lock);
3338                 cli->cl_avail_grant = 0;
3339                 cli->cl_lost_grant = 0;
3340                 spin_unlock(&cli->cl_loi_list_lock);
3341                 break;
3342         }
3343         case IMP_EVENT_INACTIVE: {
3344                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3345                 break;
3346         }
3347         case IMP_EVENT_INVALIDATE: {
3348                 struct ldlm_namespace *ns = obd->obd_namespace;
3349                 struct lu_env         *env;
3350                 __u16                  refcheck;
3351
3352                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3353
3354                 env = cl_env_get(&refcheck);
3355                 if (!IS_ERR(env)) {
3356                         osc_io_unplug(env, &obd->u.cli, NULL);
3357
3358                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3359                                                  osc_ldlm_resource_invalidate,
3360                                                  env, 0);
3361                         cl_env_put(env, &refcheck);
3362
3363                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3364                 } else
3365                         rc = PTR_ERR(env);
3366                 break;
3367         }
3368         case IMP_EVENT_ACTIVE: {
3369                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3370                 break;
3371         }
3372         case IMP_EVENT_OCD: {
3373                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3374
3375                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3376                         osc_init_grant(&obd->u.cli, ocd);
3377
3378                 /* See bug 7198 */
3379                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3380                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3381
3382                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3383                 break;
3384         }
3385         case IMP_EVENT_DEACTIVATE: {
3386                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3387                 break;
3388         }
3389         case IMP_EVENT_ACTIVATE: {
3390                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3391                 break;
3392         }
3393         default:
3394                 CERROR("Unknown import event %d\n", event);
3395                 LBUG();
3396         }
3397         RETURN(rc);
3398 }
3399
3400 /**
3401  * Determine whether the lock can be canceled before replaying the lock
3402  * during recovery, see bug16774 for detailed information.
3403  *
3404  * \retval zero the lock can't be canceled
3405  * \retval other ok to cancel
3406  */
3407 static int osc_cancel_weight(struct ldlm_lock *lock)
3408 {
3409         /*
3410          * Cancel all unused and granted extent lock.
3411          */
3412         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3413             ldlm_is_granted(lock) &&
3414             osc_ldlm_weigh_ast(lock) == 0)
3415                 RETURN(1);
3416
3417         RETURN(0);
3418 }
3419
3420 static int brw_queue_work(const struct lu_env *env, void *data)
3421 {
3422         struct client_obd *cli = data;
3423
3424         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3425
3426         osc_io_unplug(env, cli, NULL);
3427         RETURN(0);
3428 }
3429
3430 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3431 {
3432         struct client_obd *cli = &obd->u.cli;
3433         void *handler;
3434         int rc;
3435
3436         ENTRY;
3437
3438         rc = ptlrpcd_addref();
3439         if (rc)
3440                 RETURN(rc);
3441
3442         rc = client_obd_setup(obd, lcfg);
3443         if (rc)
3444                 GOTO(out_ptlrpcd, rc);
3445
3446
3447         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3448         if (IS_ERR(handler))
3449                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3450         cli->cl_writeback_work = handler;
3451
3452         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3453         if (IS_ERR(handler))
3454                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3455         cli->cl_lru_work = handler;
3456
3457         rc = osc_quota_setup(obd);
3458         if (rc)
3459                 GOTO(out_ptlrpcd_work, rc);
3460
3461         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3462         osc_update_next_shrink(cli);
3463
3464         RETURN(rc);
3465
3466 out_ptlrpcd_work:
3467         if (cli->cl_writeback_work != NULL) {
3468                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3469                 cli->cl_writeback_work = NULL;
3470         }
3471         if (cli->cl_lru_work != NULL) {
3472                 ptlrpcd_destroy_work(cli->cl_lru_work);
3473                 cli->cl_lru_work = NULL;
3474         }
3475         client_obd_cleanup(obd);
3476 out_ptlrpcd:
3477         ptlrpcd_decref();
3478         RETURN(rc);
3479 }
3480 EXPORT_SYMBOL(osc_setup_common);
3481
3482 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3483 {
3484         struct client_obd *cli = &obd->u.cli;
3485         int                adding;
3486         int                added;
3487         int                req_count;
3488         int                rc;
3489
3490         ENTRY;
3491
3492         rc = osc_setup_common(obd, lcfg);
3493         if (rc < 0)
3494                 RETURN(rc);
3495
3496         rc = osc_tunables_init(obd);
3497         if (rc)
3498                 RETURN(rc);
3499
3500         /*
3501          * We try to control the total number of requests with a upper limit
3502          * osc_reqpool_maxreqcount. There might be some race which will cause
3503          * over-limit allocation, but it is fine.
3504          */
3505         req_count = atomic_read(&osc_pool_req_count);
3506         if (req_count < osc_reqpool_maxreqcount) {
3507                 adding = cli->cl_max_rpcs_in_flight + 2;
3508                 if (req_count + adding > osc_reqpool_maxreqcount)
3509                         adding = osc_reqpool_maxreqcount - req_count;
3510
3511                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3512                 atomic_add(added, &osc_pool_req_count);
3513         }
3514
3515         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3516
3517         spin_lock(&osc_shrink_lock);
3518         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3519         spin_unlock(&osc_shrink_lock);
3520         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3521         cli->cl_import->imp_idle_debug = D_HA;
3522
3523         RETURN(0);
3524 }
3525
3526 int osc_precleanup_common(struct obd_device *obd)
3527 {
3528         struct client_obd *cli = &obd->u.cli;
3529         ENTRY;
3530
3531         /* LU-464
3532          * for echo client, export may be on zombie list, wait for
3533          * zombie thread to cull it, because cli.cl_import will be
3534          * cleared in client_disconnect_export():
3535          *   class_export_destroy() -> obd_cleanup() ->
3536          *   echo_device_free() -> echo_client_cleanup() ->
3537          *   obd_disconnect() -> osc_disconnect() ->
3538          *   client_disconnect_export()
3539          */
3540         obd_zombie_barrier();
3541         if (cli->cl_writeback_work) {
3542                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3543                 cli->cl_writeback_work = NULL;
3544         }
3545
3546         if (cli->cl_lru_work) {
3547                 ptlrpcd_destroy_work(cli->cl_lru_work);
3548                 cli->cl_lru_work = NULL;
3549         }
3550
3551         obd_cleanup_client_import(obd);
3552         RETURN(0);
3553 }
3554 EXPORT_SYMBOL(osc_precleanup_common);
3555
3556 static int osc_precleanup(struct obd_device *obd)
3557 {
3558         ENTRY;
3559
3560         osc_precleanup_common(obd);
3561
3562         ptlrpc_lprocfs_unregister_obd(obd);
3563         RETURN(0);
3564 }
3565
3566 int osc_cleanup_common(struct obd_device *obd)
3567 {
3568         struct client_obd *cli = &obd->u.cli;
3569         int rc;
3570
3571         ENTRY;
3572
3573         spin_lock(&osc_shrink_lock);
3574         list_del(&cli->cl_shrink_list);
3575         spin_unlock(&osc_shrink_lock);
3576
3577         /* lru cleanup */
3578         if (cli->cl_cache != NULL) {
3579                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3580                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3581                 list_del_init(&cli->cl_lru_osc);
3582                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3583                 cli->cl_lru_left = NULL;
3584                 cl_cache_decref(cli->cl_cache);
3585                 cli->cl_cache = NULL;
3586         }
3587
3588         /* free memory of osc quota cache */
3589         osc_quota_cleanup(obd);
3590
3591         rc = client_obd_cleanup(obd);
3592
3593         ptlrpcd_decref();
3594         RETURN(rc);
3595 }
3596 EXPORT_SYMBOL(osc_cleanup_common);
3597
3598 static const struct obd_ops osc_obd_ops = {
3599         .o_owner                = THIS_MODULE,
3600         .o_setup                = osc_setup,
3601         .o_precleanup           = osc_precleanup,
3602         .o_cleanup              = osc_cleanup_common,
3603         .o_add_conn             = client_import_add_conn,
3604         .o_del_conn             = client_import_del_conn,
3605         .o_connect              = client_connect_import,
3606         .o_reconnect            = osc_reconnect,
3607         .o_disconnect           = osc_disconnect,
3608         .o_statfs               = osc_statfs,
3609         .o_statfs_async         = osc_statfs_async,
3610         .o_create               = osc_create,
3611         .o_destroy              = osc_destroy,
3612         .o_getattr              = osc_getattr,
3613         .o_setattr              = osc_setattr,
3614         .o_iocontrol            = osc_iocontrol,
3615         .o_set_info_async       = osc_set_info_async,
3616         .o_import_event         = osc_import_event,
3617         .o_quotactl             = osc_quotactl,
3618 };
3619
3620 static struct shrinker *osc_cache_shrinker;
3621 LIST_HEAD(osc_shrink_list);
3622 DEFINE_SPINLOCK(osc_shrink_lock);
3623
3624 #ifndef HAVE_SHRINKER_COUNT
3625 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3626 {
3627         struct shrink_control scv = {
3628                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3629                 .gfp_mask   = shrink_param(sc, gfp_mask)
3630         };
3631         (void)osc_cache_shrink_scan(shrinker, &scv);
3632
3633         return osc_cache_shrink_count(shrinker, &scv);
3634 }
3635 #endif
3636
3637 static int __init osc_init(void)
3638 {
3639         unsigned int reqpool_size;
3640         unsigned int reqsize;
3641         int rc;
3642         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3643                          osc_cache_shrink_count, osc_cache_shrink_scan);
3644         ENTRY;
3645
3646         /* print an address of _any_ initialized kernel symbol from this
3647          * module, to allow debugging with gdb that doesn't support data
3648          * symbols from modules.*/
3649         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3650
3651         rc = lu_kmem_init(osc_caches);
3652         if (rc)
3653                 RETURN(rc);
3654
3655         rc = class_register_type(&osc_obd_ops, NULL, true,
3656                                  LUSTRE_OSC_NAME, &osc_device_type);
3657         if (rc)
3658                 GOTO(out_kmem, rc);
3659
3660         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3661
3662         /* This is obviously too much memory, only prevent overflow here */
3663         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3664                 GOTO(out_type, rc = -EINVAL);
3665
3666         reqpool_size = osc_reqpool_mem_max << 20;
3667
3668         reqsize = 1;
3669         while (reqsize < OST_IO_MAXREQSIZE)
3670                 reqsize = reqsize << 1;
3671
3672         /*
3673          * We don't enlarge the request count in OSC pool according to
3674          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3675          * tried after normal allocation failed. So a small OSC pool won't
3676          * cause much performance degression in most of cases.
3677          */
3678         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3679
3680         atomic_set(&osc_pool_req_count, 0);
3681         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3682                                           ptlrpc_add_rqs_to_pool);
3683
3684         if (osc_rq_pool == NULL)
3685                 GOTO(out_type, rc = -ENOMEM);
3686
3687         rc = osc_start_grant_work();
3688         if (rc != 0)
3689                 GOTO(out_req_pool, rc);
3690
3691         RETURN(rc);
3692
3693 out_req_pool:
3694         ptlrpc_free_rq_pool(osc_rq_pool);
3695 out_type:
3696         class_unregister_type(LUSTRE_OSC_NAME);
3697 out_kmem:
3698         lu_kmem_fini(osc_caches);
3699
3700         RETURN(rc);
3701 }
3702
3703 static void __exit osc_exit(void)
3704 {
3705         osc_stop_grant_work();
3706         remove_shrinker(osc_cache_shrinker);
3707         class_unregister_type(LUSTRE_OSC_NAME);
3708         lu_kmem_fini(osc_caches);
3709         ptlrpc_free_rq_pool(osc_rq_pool);
3710 }
3711
3712 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3713 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3714 MODULE_VERSION(LUSTRE_VERSION_STRING);
3715 MODULE_LICENSE("GPL");
3716
3717 module_init(osc_init);
3718 module_exit(osc_exit);