Whamcloud - gitweb
LU-14125 osc: prevent overflow of o_dropped
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
751         if (cli->cl_lost_grant > INT_MAX) {
752                 CDEBUG(D_CACHE,
753                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
754                       cli_name(cli), cli->cl_lost_grant);
755                 oa->o_dropped = INT_MAX;
756         } else {
757                 oa->o_dropped = cli->cl_lost_grant;
758         }
759         cli->cl_lost_grant -= oa->o_dropped;
760         spin_unlock(&cli->cl_loi_list_lock);
761         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
762                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
763                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
764 }
765
766 void osc_update_next_shrink(struct client_obd *cli)
767 {
768         cli->cl_next_shrink_grant = ktime_get_seconds() +
769                                     cli->cl_grant_shrink_interval;
770
771         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
772                cli->cl_next_shrink_grant);
773 }
774
775 static void __osc_update_grant(struct client_obd *cli, u64 grant)
776 {
777         spin_lock(&cli->cl_loi_list_lock);
778         cli->cl_avail_grant += grant;
779         spin_unlock(&cli->cl_loi_list_lock);
780 }
781
782 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
783 {
784         if (body->oa.o_valid & OBD_MD_FLGRANT) {
785                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
786                 __osc_update_grant(cli, body->oa.o_grant);
787         }
788 }
789
790 /**
791  * grant thread data for shrinking space.
792  */
793 struct grant_thread_data {
794         struct list_head        gtd_clients;
795         struct mutex            gtd_mutex;
796         unsigned long           gtd_stopped:1;
797 };
798 static struct grant_thread_data client_gtd;
799
800 static int osc_shrink_grant_interpret(const struct lu_env *env,
801                                       struct ptlrpc_request *req,
802                                       void *args, int rc)
803 {
804         struct osc_grant_args *aa = args;
805         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
806         struct ost_body *body;
807
808         if (rc != 0) {
809                 __osc_update_grant(cli, aa->aa_oa->o_grant);
810                 GOTO(out, rc);
811         }
812
813         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
814         LASSERT(body);
815         osc_update_grant(cli, body);
816 out:
817         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
818         aa->aa_oa = NULL;
819
820         return rc;
821 }
822
823 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
824 {
825         spin_lock(&cli->cl_loi_list_lock);
826         oa->o_grant = cli->cl_avail_grant / 4;
827         cli->cl_avail_grant -= oa->o_grant;
828         spin_unlock(&cli->cl_loi_list_lock);
829         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
830                 oa->o_valid |= OBD_MD_FLFLAGS;
831                 oa->o_flags = 0;
832         }
833         oa->o_flags |= OBD_FL_SHRINK_GRANT;
834         osc_update_next_shrink(cli);
835 }
836
837 /* Shrink the current grant, either from some large amount to enough for a
838  * full set of in-flight RPCs, or if we have already shrunk to that limit
839  * then to enough for a single RPC.  This avoids keeping more grant than
840  * needed, and avoids shrinking the grant piecemeal. */
841 static int osc_shrink_grant(struct client_obd *cli)
842 {
843         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
844                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
845
846         spin_lock(&cli->cl_loi_list_lock);
847         if (cli->cl_avail_grant <= target_bytes)
848                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
849         spin_unlock(&cli->cl_loi_list_lock);
850
851         return osc_shrink_grant_to_target(cli, target_bytes);
852 }
853
854 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
855 {
856         int                     rc = 0;
857         struct ost_body        *body;
858         ENTRY;
859
860         spin_lock(&cli->cl_loi_list_lock);
861         /* Don't shrink if we are already above or below the desired limit
862          * We don't want to shrink below a single RPC, as that will negatively
863          * impact block allocation and long-term performance. */
864         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
865                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
866
867         if (target_bytes >= cli->cl_avail_grant) {
868                 spin_unlock(&cli->cl_loi_list_lock);
869                 RETURN(0);
870         }
871         spin_unlock(&cli->cl_loi_list_lock);
872
873         OBD_ALLOC_PTR(body);
874         if (!body)
875                 RETURN(-ENOMEM);
876
877         osc_announce_cached(cli, &body->oa, 0);
878
879         spin_lock(&cli->cl_loi_list_lock);
880         if (target_bytes >= cli->cl_avail_grant) {
881                 /* available grant has changed since target calculation */
882                 spin_unlock(&cli->cl_loi_list_lock);
883                 GOTO(out_free, rc = 0);
884         }
885         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
886         cli->cl_avail_grant = target_bytes;
887         spin_unlock(&cli->cl_loi_list_lock);
888         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
889                 body->oa.o_valid |= OBD_MD_FLFLAGS;
890                 body->oa.o_flags = 0;
891         }
892         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
893         osc_update_next_shrink(cli);
894
895         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
896                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
897                                 sizeof(*body), body, NULL);
898         if (rc != 0)
899                 __osc_update_grant(cli, body->oa.o_grant);
900 out_free:
901         OBD_FREE_PTR(body);
902         RETURN(rc);
903 }
904
905 static int osc_should_shrink_grant(struct client_obd *client)
906 {
907         time64_t next_shrink = client->cl_next_shrink_grant;
908
909         if (client->cl_import == NULL)
910                 return 0;
911
912         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
913             client->cl_import->imp_grant_shrink_disabled) {
914                 osc_update_next_shrink(client);
915                 return 0;
916         }
917
918         if (ktime_get_seconds() >= next_shrink - 5) {
919                 /* Get the current RPC size directly, instead of going via:
920                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
921                  * Keep comment here so that it can be found by searching. */
922                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
923
924                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
925                     client->cl_avail_grant > brw_size)
926                         return 1;
927                 else
928                         osc_update_next_shrink(client);
929         }
930         return 0;
931 }
932
933 #define GRANT_SHRINK_RPC_BATCH  100
934
935 static struct delayed_work work;
936
937 static void osc_grant_work_handler(struct work_struct *data)
938 {
939         struct client_obd *cli;
940         int rpc_sent;
941         bool init_next_shrink = true;
942         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
943
944         rpc_sent = 0;
945         mutex_lock(&client_gtd.gtd_mutex);
946         list_for_each_entry(cli, &client_gtd.gtd_clients,
947                             cl_grant_chain) {
948                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
949                     osc_should_shrink_grant(cli)) {
950                         osc_shrink_grant(cli);
951                         rpc_sent++;
952                 }
953
954                 if (!init_next_shrink) {
955                         if (cli->cl_next_shrink_grant < next_shrink &&
956                             cli->cl_next_shrink_grant > ktime_get_seconds())
957                                 next_shrink = cli->cl_next_shrink_grant;
958                 } else {
959                         init_next_shrink = false;
960                         next_shrink = cli->cl_next_shrink_grant;
961                 }
962         }
963         mutex_unlock(&client_gtd.gtd_mutex);
964
965         if (client_gtd.gtd_stopped == 1)
966                 return;
967
968         if (next_shrink > ktime_get_seconds()) {
969                 time64_t delay = next_shrink - ktime_get_seconds();
970
971                 schedule_delayed_work(&work, cfs_time_seconds(delay));
972         } else {
973                 schedule_work(&work.work);
974         }
975 }
976
977 void osc_schedule_grant_work(void)
978 {
979         cancel_delayed_work_sync(&work);
980         schedule_work(&work.work);
981 }
982
983 /**
984  * Start grant thread for returing grant to server for idle clients.
985  */
986 static int osc_start_grant_work(void)
987 {
988         client_gtd.gtd_stopped = 0;
989         mutex_init(&client_gtd.gtd_mutex);
990         INIT_LIST_HEAD(&client_gtd.gtd_clients);
991
992         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
993         schedule_work(&work.work);
994
995         return 0;
996 }
997
998 static void osc_stop_grant_work(void)
999 {
1000         client_gtd.gtd_stopped = 1;
1001         cancel_delayed_work_sync(&work);
1002 }
1003
1004 static void osc_add_grant_list(struct client_obd *client)
1005 {
1006         mutex_lock(&client_gtd.gtd_mutex);
1007         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1008         mutex_unlock(&client_gtd.gtd_mutex);
1009 }
1010
1011 static void osc_del_grant_list(struct client_obd *client)
1012 {
1013         if (list_empty(&client->cl_grant_chain))
1014                 return;
1015
1016         mutex_lock(&client_gtd.gtd_mutex);
1017         list_del_init(&client->cl_grant_chain);
1018         mutex_unlock(&client_gtd.gtd_mutex);
1019 }
1020
1021 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022 {
1023         /*
1024          * ocd_grant is the total grant amount we're expect to hold: if we've
1025          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1026          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1027          * dirty.
1028          *
1029          * race is tolerable here: if we're evicted, but imp_state already
1030          * left EVICTED state, then cl_dirty_pages must be 0 already.
1031          */
1032         spin_lock(&cli->cl_loi_list_lock);
1033         cli->cl_avail_grant = ocd->ocd_grant;
1034         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1035                 unsigned long consumed = cli->cl_reserved_grant;
1036
1037                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1038                         consumed += cli->cl_dirty_grant;
1039                 else
1040                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1041                 if (cli->cl_avail_grant < consumed) {
1042                         CERROR("%s: granted %ld but already consumed %ld\n",
1043                                cli_name(cli), cli->cl_avail_grant, consumed);
1044                         cli->cl_avail_grant = 0;
1045                 } else {
1046                         cli->cl_avail_grant -= consumed;
1047                 }
1048         }
1049
1050         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1051                 u64 size;
1052                 int chunk_mask;
1053
1054                 /* overhead for each extent insertion */
1055                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1056                 /* determine the appropriate chunk size used by osc_extent. */
1057                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1058                                           ocd->ocd_grant_blkbits);
1059                 /* max_pages_per_rpc must be chunk aligned */
1060                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1061                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1062                                              ~chunk_mask) & chunk_mask;
1063                 /* determine maximum extent size, in #pages */
1064                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1065                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1066                 if (cli->cl_max_extent_pages == 0)
1067                         cli->cl_max_extent_pages = 1;
1068         } else {
1069                 cli->cl_grant_extent_tax = 0;
1070                 cli->cl_chunkbits = PAGE_SHIFT;
1071                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1072         }
1073         spin_unlock(&cli->cl_loi_list_lock);
1074
1075         CDEBUG(D_CACHE,
1076                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1077                cli_name(cli),
1078                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1079                cli->cl_max_extent_pages);
1080
1081         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1082                 osc_add_grant_list(cli);
1083 }
1084 EXPORT_SYMBOL(osc_init_grant);
1085
1086 /* We assume that the reason this OSC got a short read is because it read
1087  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1088  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1089  * this stripe never got written at or beyond this stripe offset yet. */
1090 static void handle_short_read(int nob_read, size_t page_count,
1091                               struct brw_page **pga)
1092 {
1093         char *ptr;
1094         int i = 0;
1095
1096         /* skip bytes read OK */
1097         while (nob_read > 0) {
1098                 LASSERT (page_count > 0);
1099
1100                 if (pga[i]->count > nob_read) {
1101                         /* EOF inside this page */
1102                         ptr = kmap(pga[i]->pg) +
1103                                 (pga[i]->off & ~PAGE_MASK);
1104                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1105                         kunmap(pga[i]->pg);
1106                         page_count--;
1107                         i++;
1108                         break;
1109                 }
1110
1111                 nob_read -= pga[i]->count;
1112                 page_count--;
1113                 i++;
1114         }
1115
1116         /* zero remaining pages */
1117         while (page_count-- > 0) {
1118                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1119                 memset(ptr, 0, pga[i]->count);
1120                 kunmap(pga[i]->pg);
1121                 i++;
1122         }
1123 }
1124
1125 static int check_write_rcs(struct ptlrpc_request *req,
1126                            int requested_nob, int niocount,
1127                            size_t page_count, struct brw_page **pga)
1128 {
1129         int     i;
1130         __u32   *remote_rcs;
1131
1132         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1133                                                   sizeof(*remote_rcs) *
1134                                                   niocount);
1135         if (remote_rcs == NULL) {
1136                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1137                 return(-EPROTO);
1138         }
1139
1140         /* return error if any niobuf was in error */
1141         for (i = 0; i < niocount; i++) {
1142                 if ((int)remote_rcs[i] < 0) {
1143                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1144                                i, remote_rcs[i], req);
1145                         return remote_rcs[i];
1146                 }
1147
1148                 if (remote_rcs[i] != 0) {
1149                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1150                                 i, remote_rcs[i], req);
1151                         return(-EPROTO);
1152                 }
1153         }
1154         if (req->rq_bulk != NULL &&
1155             req->rq_bulk->bd_nob_transferred != requested_nob) {
1156                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1157                        req->rq_bulk->bd_nob_transferred, requested_nob);
1158                 return(-EPROTO);
1159         }
1160
1161         return (0);
1162 }
1163
1164 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1165 {
1166         if (p1->flag != p2->flag) {
1167                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1168                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1169                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1170
1171                 /* warn if we try to combine flags that we don't know to be
1172                  * safe to combine */
1173                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175                               "report this at https://jira.whamcloud.com/\n",
1176                               p1->flag, p2->flag);
1177                 }
1178                 return 0;
1179         }
1180
1181         return (p1->off + p1->count == p2->off);
1182 }
1183
1184 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1185 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1186                                    size_t pg_count, struct brw_page **pga,
1187                                    int opc, obd_dif_csum_fn *fn,
1188                                    int sector_size,
1189                                    u32 *check_sum)
1190 {
1191         struct ahash_request *req;
1192         /* Used Adler as the default checksum type on top of DIF tags */
1193         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1194         struct page *__page;
1195         unsigned char *buffer;
1196         __u16 *guard_start;
1197         unsigned int bufsize;
1198         int guard_number;
1199         int used_number = 0;
1200         int used;
1201         u32 cksum;
1202         int rc = 0;
1203         int i = 0;
1204
1205         LASSERT(pg_count > 0);
1206
1207         __page = alloc_page(GFP_KERNEL);
1208         if (__page == NULL)
1209                 return -ENOMEM;
1210
1211         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1212         if (IS_ERR(req)) {
1213                 rc = PTR_ERR(req);
1214                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1215                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1216                 GOTO(out, rc);
1217         }
1218
1219         buffer = kmap(__page);
1220         guard_start = (__u16 *)buffer;
1221         guard_number = PAGE_SIZE / sizeof(*guard_start);
1222         while (nob > 0 && pg_count > 0) {
1223                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1224
1225                 /* corrupt the data before we compute the checksum, to
1226                  * simulate an OST->client data error */
1227                 if (unlikely(i == 0 && opc == OST_READ &&
1228                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1229                         unsigned char *ptr = kmap(pga[i]->pg);
1230                         int off = pga[i]->off & ~PAGE_MASK;
1231
1232                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1233                         kunmap(pga[i]->pg);
1234                 }
1235
1236                 /*
1237                  * The left guard number should be able to hold checksums of a
1238                  * whole page
1239                  */
1240                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1241                                                   pga[i]->off & ~PAGE_MASK,
1242                                                   count,
1243                                                   guard_start + used_number,
1244                                                   guard_number - used_number,
1245                                                   &used, sector_size,
1246                                                   fn);
1247                 if (rc)
1248                         break;
1249
1250                 used_number += used;
1251                 if (used_number == guard_number) {
1252                         cfs_crypto_hash_update_page(req, __page, 0,
1253                                 used_number * sizeof(*guard_start));
1254                         used_number = 0;
1255                 }
1256
1257                 nob -= pga[i]->count;
1258                 pg_count--;
1259                 i++;
1260         }
1261         kunmap(__page);
1262         if (rc)
1263                 GOTO(out, rc);
1264
1265         if (used_number != 0)
1266                 cfs_crypto_hash_update_page(req, __page, 0,
1267                         used_number * sizeof(*guard_start));
1268
1269         bufsize = sizeof(cksum);
1270         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1271
1272         /* For sending we only compute the wrong checksum instead
1273          * of corrupting the data so it is still correct on a redo */
1274         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1275                 cksum++;
1276
1277         *check_sum = cksum;
1278 out:
1279         __free_page(__page);
1280         return rc;
1281 }
1282 #else /* !CONFIG_CRC_T10DIF */
1283 #define obd_dif_ip_fn NULL
1284 #define obd_dif_crc_fn NULL
1285 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1286         -EOPNOTSUPP
1287 #endif /* CONFIG_CRC_T10DIF */
1288
1289 static int osc_checksum_bulk(int nob, size_t pg_count,
1290                              struct brw_page **pga, int opc,
1291                              enum cksum_types cksum_type,
1292                              u32 *cksum)
1293 {
1294         int                             i = 0;
1295         struct ahash_request           *req;
1296         unsigned int                    bufsize;
1297         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1298
1299         LASSERT(pg_count > 0);
1300
1301         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1302         if (IS_ERR(req)) {
1303                 CERROR("Unable to initialize checksum hash %s\n",
1304                        cfs_crypto_hash_name(cfs_alg));
1305                 return PTR_ERR(req);
1306         }
1307
1308         while (nob > 0 && pg_count > 0) {
1309                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1310
1311                 /* corrupt the data before we compute the checksum, to
1312                  * simulate an OST->client data error */
1313                 if (i == 0 && opc == OST_READ &&
1314                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1315                         unsigned char *ptr = kmap(pga[i]->pg);
1316                         int off = pga[i]->off & ~PAGE_MASK;
1317
1318                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1319                         kunmap(pga[i]->pg);
1320                 }
1321                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1322                                             pga[i]->off & ~PAGE_MASK,
1323                                             count);
1324                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1325                                (int)(pga[i]->off & ~PAGE_MASK));
1326
1327                 nob -= pga[i]->count;
1328                 pg_count--;
1329                 i++;
1330         }
1331
1332         bufsize = sizeof(*cksum);
1333         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1334
1335         /* For sending we only compute the wrong checksum instead
1336          * of corrupting the data so it is still correct on a redo */
1337         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1338                 (*cksum)++;
1339
1340         return 0;
1341 }
1342
1343 static int osc_checksum_bulk_rw(const char *obd_name,
1344                                 enum cksum_types cksum_type,
1345                                 int nob, size_t pg_count,
1346                                 struct brw_page **pga, int opc,
1347                                 u32 *check_sum)
1348 {
1349         obd_dif_csum_fn *fn = NULL;
1350         int sector_size = 0;
1351         int rc;
1352
1353         ENTRY;
1354         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1355
1356         if (fn)
1357                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1358                                              opc, fn, sector_size, check_sum);
1359         else
1360                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1361                                        check_sum);
1362
1363         RETURN(rc);
1364 }
1365
1366 static inline void osc_release_bounce_pages(struct brw_page **pga,
1367                                             u32 page_count)
1368 {
1369 #ifdef HAVE_LUSTRE_CRYPTO
1370         int i;
1371
1372         for (i = 0; i < page_count; i++) {
1373                 /* Bounce pages allocated by a call to
1374                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1375                  * are identified thanks to the PageChecked flag.
1376                  */
1377                 if (PageChecked(pga[i]->pg))
1378                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1379                 pga[i]->count -= pga[i]->bp_count_diff;
1380                 pga[i]->off += pga[i]->bp_off_diff;
1381         }
1382 #endif
1383 }
1384
1385 static int
1386 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1387                      u32 page_count, struct brw_page **pga,
1388                      struct ptlrpc_request **reqp, int resend)
1389 {
1390         struct ptlrpc_request *req;
1391         struct ptlrpc_bulk_desc *desc;
1392         struct ost_body *body;
1393         struct obd_ioobj *ioobj;
1394         struct niobuf_remote *niobuf;
1395         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1396         struct osc_brw_async_args *aa;
1397         struct req_capsule *pill;
1398         struct brw_page *pg_prev;
1399         void *short_io_buf;
1400         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1401         struct inode *inode;
1402
1403         ENTRY;
1404         inode = page2inode(pga[0]->pg);
1405         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1406                 RETURN(-ENOMEM); /* Recoverable */
1407         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1408                 RETURN(-EINVAL); /* Fatal */
1409
1410         if ((cmd & OBD_BRW_WRITE) != 0) {
1411                 opc = OST_WRITE;
1412                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1413                                                 osc_rq_pool,
1414                                                 &RQF_OST_BRW_WRITE);
1415         } else {
1416                 opc = OST_READ;
1417                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1418         }
1419         if (req == NULL)
1420                 RETURN(-ENOMEM);
1421
1422         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1423                 for (i = 0; i < page_count; i++) {
1424                         struct brw_page *pg = pga[i];
1425                         struct page *data_page = NULL;
1426                         bool retried = false;
1427                         bool lockedbymyself;
1428                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1429
1430 retry_encrypt:
1431                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1432                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1433                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1434                         /* The page can already be locked when we arrive here.
1435                          * This is possible when cl_page_assume/vvp_page_assume
1436                          * is stuck on wait_on_page_writeback with page lock
1437                          * held. In this case there is no risk for the lock to
1438                          * be released while we are doing our encryption
1439                          * processing, because writeback against that page will
1440                          * end in vvp_page_completion_write/cl_page_completion,
1441                          * which means only once the page is fully processed.
1442                          */
1443                         lockedbymyself = trylock_page(pg->pg);
1444                         data_page =
1445                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1446                                                                  nunits, 0,
1447                                                                  GFP_NOFS);
1448                         if (lockedbymyself)
1449                                 unlock_page(pg->pg);
1450                         if (IS_ERR(data_page)) {
1451                                 rc = PTR_ERR(data_page);
1452                                 if (rc == -ENOMEM && !retried) {
1453                                         retried = true;
1454                                         rc = 0;
1455                                         goto retry_encrypt;
1456                                 }
1457                                 ptlrpc_request_free(req);
1458                                 RETURN(rc);
1459                         }
1460                         /* Set PageChecked flag on bounce page for
1461                          * disambiguation in osc_release_bounce_pages().
1462                          */
1463                         SetPageChecked(data_page);
1464                         pg->pg = data_page;
1465                         /* there should be no gap in the middle of page array */
1466                         if (i == page_count - 1) {
1467                                 struct osc_async_page *oap = brw_page2oap(pg);
1468
1469                                 oa->o_size = oap->oap_count +
1470                                         oap->oap_obj_off + oap->oap_page_off;
1471                         }
1472                         /* len is forced to nunits, and relative offset to 0
1473                          * so store the old, clear text info
1474                          */
1475                         pg->bp_count_diff = nunits - pg->count;
1476                         pg->count = nunits;
1477                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1478                         pg->off = pg->off & PAGE_MASK;
1479                 }
1480         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1481                 for (i = 0; i < page_count; i++) {
1482                         struct brw_page *pg = pga[i];
1483                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1484
1485                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1486                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1487                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1488                         /* count/off are forced to cover the whole encryption
1489                          * unit size so that all encrypted data is stored on the
1490                          * OST, so adjust bp_{count,off}_diff for the size of
1491                          * the clear text.
1492                          */
1493                         pg->bp_count_diff = nunits - pg->count;
1494                         pg->count = nunits;
1495                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1496                         pg->off = pg->off & PAGE_MASK;
1497                 }
1498         }
1499
1500         for (niocount = i = 1; i < page_count; i++) {
1501                 if (!can_merge_pages(pga[i - 1], pga[i]))
1502                         niocount++;
1503         }
1504
1505         pill = &req->rq_pill;
1506         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1507                              sizeof(*ioobj));
1508         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1509                              niocount * sizeof(*niobuf));
1510
1511         for (i = 0; i < page_count; i++) {
1512                 short_io_size += pga[i]->count;
1513                 if (!inode || !IS_ENCRYPTED(inode)) {
1514                         pga[i]->bp_count_diff = 0;
1515                         pga[i]->bp_off_diff = 0;
1516                 }
1517         }
1518
1519         /* Check if read/write is small enough to be a short io. */
1520         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1521             !imp_connect_shortio(cli->cl_import))
1522                 short_io_size = 0;
1523
1524         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1525                              opc == OST_READ ? 0 : short_io_size);
1526         if (opc == OST_READ)
1527                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1528                                      short_io_size);
1529
1530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1531         if (rc) {
1532                 ptlrpc_request_free(req);
1533                 RETURN(rc);
1534         }
1535         osc_set_io_portal(req);
1536
1537         ptlrpc_at_set_req_timeout(req);
1538         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1539          * retry logic */
1540         req->rq_no_retry_einprogress = 1;
1541
1542         if (short_io_size != 0) {
1543                 desc = NULL;
1544                 short_io_buf = NULL;
1545                 goto no_bulk;
1546         }
1547
1548         desc = ptlrpc_prep_bulk_imp(req, page_count,
1549                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1550                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1551                         PTLRPC_BULK_PUT_SINK),
1552                 OST_BULK_PORTAL,
1553                 &ptlrpc_bulk_kiov_pin_ops);
1554
1555         if (desc == NULL)
1556                 GOTO(out, rc = -ENOMEM);
1557         /* NB request now owns desc and will free it when it gets freed */
1558 no_bulk:
1559         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1560         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1561         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1562         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1563
1564         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1565
1566         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1567          * and from_kgid(), because they are asynchronous. Fortunately, variable
1568          * oa contains valid o_uid and o_gid in these two operations.
1569          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1570          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1571          * other process logic */
1572         body->oa.o_uid = oa->o_uid;
1573         body->oa.o_gid = oa->o_gid;
1574
1575         obdo_to_ioobj(oa, ioobj);
1576         ioobj->ioo_bufcnt = niocount;
1577         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1578          * that might be send for this request.  The actual number is decided
1579          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1580          * "max - 1" for old client compatibility sending "0", and also so the
1581          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1582         if (desc != NULL)
1583                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1584         else /* short io */
1585                 ioobj_max_brw_set(ioobj, 0);
1586
1587         if (short_io_size != 0) {
1588                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1589                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1590                         body->oa.o_flags = 0;
1591                 }
1592                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1593                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1594                        short_io_size);
1595                 if (opc == OST_WRITE) {
1596                         short_io_buf = req_capsule_client_get(pill,
1597                                                               &RMF_SHORT_IO);
1598                         LASSERT(short_io_buf != NULL);
1599                 }
1600         }
1601
1602         LASSERT(page_count > 0);
1603         pg_prev = pga[0];
1604         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1605                 struct brw_page *pg = pga[i];
1606                 int poff = pg->off & ~PAGE_MASK;
1607
1608                 LASSERT(pg->count > 0);
1609                 /* make sure there is no gap in the middle of page array */
1610                 LASSERTF(page_count == 1 ||
1611                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1612                           ergo(i > 0 && i < page_count - 1,
1613                                poff == 0 && pg->count == PAGE_SIZE)   &&
1614                           ergo(i == page_count - 1, poff == 0)),
1615                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1616                          i, page_count, pg, pg->off, pg->count);
1617                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1618                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1619                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1620                          i, page_count,
1621                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1622                          pg_prev->pg, page_private(pg_prev->pg),
1623                          pg_prev->pg->index, pg_prev->off);
1624                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1625                         (pg->flag & OBD_BRW_SRVLOCK));
1626                 if (short_io_size != 0 && opc == OST_WRITE) {
1627                         unsigned char *ptr = kmap_atomic(pg->pg);
1628
1629                         LASSERT(short_io_size >= requested_nob + pg->count);
1630                         memcpy(short_io_buf + requested_nob,
1631                                ptr + poff,
1632                                pg->count);
1633                         kunmap_atomic(ptr);
1634                 } else if (short_io_size == 0) {
1635                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1636                                                          pg->count);
1637                 }
1638                 requested_nob += pg->count;
1639
1640                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1641                         niobuf--;
1642                         niobuf->rnb_len += pg->count;
1643                 } else {
1644                         niobuf->rnb_offset = pg->off;
1645                         niobuf->rnb_len    = pg->count;
1646                         niobuf->rnb_flags  = pg->flag;
1647                 }
1648                 pg_prev = pg;
1649         }
1650
1651         LASSERTF((void *)(niobuf - niocount) ==
1652                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1653                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1654                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1655
1656         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1657         if (resend) {
1658                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1659                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1660                         body->oa.o_flags = 0;
1661                 }
1662                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1663         }
1664
1665         if (osc_should_shrink_grant(cli))
1666                 osc_shrink_grant_local(cli, &body->oa);
1667
1668         /* size[REQ_REC_OFF] still sizeof (*body) */
1669         if (opc == OST_WRITE) {
1670                 if (cli->cl_checksum &&
1671                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1672                         /* store cl_cksum_type in a local variable since
1673                          * it can be changed via lprocfs */
1674                         enum cksum_types cksum_type = cli->cl_cksum_type;
1675
1676                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1677                                 body->oa.o_flags = 0;
1678
1679                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1680                                                                 cksum_type);
1681                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1682
1683                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1684                                                   requested_nob, page_count,
1685                                                   pga, OST_WRITE,
1686                                                   &body->oa.o_cksum);
1687                         if (rc < 0) {
1688                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1689                                        rc);
1690                                 GOTO(out, rc);
1691                         }
1692                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1693                                body->oa.o_cksum);
1694
1695                         /* save this in 'oa', too, for later checking */
1696                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1697                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1698                                                            cksum_type);
1699                 } else {
1700                         /* clear out the checksum flag, in case this is a
1701                          * resend but cl_checksum is no longer set. b=11238 */
1702                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1703                 }
1704                 oa->o_cksum = body->oa.o_cksum;
1705                 /* 1 RC per niobuf */
1706                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1707                                      sizeof(__u32) * niocount);
1708         } else {
1709                 if (cli->cl_checksum &&
1710                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1711                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1712                                 body->oa.o_flags = 0;
1713                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1714                                 cli->cl_cksum_type);
1715                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1716                 }
1717
1718                 /* Client cksum has been already copied to wire obdo in previous
1719                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1720                  * resent due to cksum error, this will allow Server to
1721                  * check+dump pages on its side */
1722         }
1723         ptlrpc_request_set_replen(req);
1724
1725         aa = ptlrpc_req_async_args(aa, req);
1726         aa->aa_oa = oa;
1727         aa->aa_requested_nob = requested_nob;
1728         aa->aa_nio_count = niocount;
1729         aa->aa_page_count = page_count;
1730         aa->aa_resends = 0;
1731         aa->aa_ppga = pga;
1732         aa->aa_cli = cli;
1733         INIT_LIST_HEAD(&aa->aa_oaps);
1734
1735         *reqp = req;
1736         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1737         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1738                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1739                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1740         RETURN(0);
1741
1742  out:
1743         ptlrpc_req_finished(req);
1744         RETURN(rc);
1745 }
1746
1747 char dbgcksum_file_name[PATH_MAX];
1748
1749 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1750                                 struct brw_page **pga, __u32 server_cksum,
1751                                 __u32 client_cksum)
1752 {
1753         struct file *filp;
1754         int rc, i;
1755         unsigned int len;
1756         char *buf;
1757
1758         /* will only keep dump of pages on first error for the same range in
1759          * file/fid, not during the resends/retries. */
1760         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1761                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1762                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1763                   libcfs_debug_file_path_arr :
1764                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1765                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1766                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1767                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1768                  pga[0]->off,
1769                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1770                  client_cksum, server_cksum);
1771         filp = filp_open(dbgcksum_file_name,
1772                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1773         if (IS_ERR(filp)) {
1774                 rc = PTR_ERR(filp);
1775                 if (rc == -EEXIST)
1776                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1777                                "checksum error: rc = %d\n", dbgcksum_file_name,
1778                                rc);
1779                 else
1780                         CERROR("%s: can't open to dump pages with checksum "
1781                                "error: rc = %d\n", dbgcksum_file_name, rc);
1782                 return;
1783         }
1784
1785         for (i = 0; i < page_count; i++) {
1786                 len = pga[i]->count;
1787                 buf = kmap(pga[i]->pg);
1788                 while (len != 0) {
1789                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1790                         if (rc < 0) {
1791                                 CERROR("%s: wanted to write %u but got %d "
1792                                        "error\n", dbgcksum_file_name, len, rc);
1793                                 break;
1794                         }
1795                         len -= rc;
1796                         buf += rc;
1797                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1798                                dbgcksum_file_name, rc);
1799                 }
1800                 kunmap(pga[i]->pg);
1801         }
1802
1803         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1804         if (rc)
1805                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1806         filp_close(filp, NULL);
1807 }
1808
1809 static int
1810 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1811                      __u32 client_cksum, __u32 server_cksum,
1812                      struct osc_brw_async_args *aa)
1813 {
1814         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1815         enum cksum_types cksum_type;
1816         obd_dif_csum_fn *fn = NULL;
1817         int sector_size = 0;
1818         __u32 new_cksum;
1819         char *msg;
1820         int rc;
1821
1822         if (server_cksum == client_cksum) {
1823                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1824                 return 0;
1825         }
1826
1827         if (aa->aa_cli->cl_checksum_dump)
1828                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1829                                     server_cksum, client_cksum);
1830
1831         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1832                                            oa->o_flags : 0);
1833
1834         switch (cksum_type) {
1835         case OBD_CKSUM_T10IP512:
1836                 fn = obd_dif_ip_fn;
1837                 sector_size = 512;
1838                 break;
1839         case OBD_CKSUM_T10IP4K:
1840                 fn = obd_dif_ip_fn;
1841                 sector_size = 4096;
1842                 break;
1843         case OBD_CKSUM_T10CRC512:
1844                 fn = obd_dif_crc_fn;
1845                 sector_size = 512;
1846                 break;
1847         case OBD_CKSUM_T10CRC4K:
1848                 fn = obd_dif_crc_fn;
1849                 sector_size = 4096;
1850                 break;
1851         default:
1852                 break;
1853         }
1854
1855         if (fn)
1856                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1857                                              aa->aa_page_count, aa->aa_ppga,
1858                                              OST_WRITE, fn, sector_size,
1859                                              &new_cksum);
1860         else
1861                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1862                                        aa->aa_ppga, OST_WRITE, cksum_type,
1863                                        &new_cksum);
1864
1865         if (rc < 0)
1866                 msg = "failed to calculate the client write checksum";
1867         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1868                 msg = "the server did not use the checksum type specified in "
1869                       "the original request - likely a protocol problem";
1870         else if (new_cksum == server_cksum)
1871                 msg = "changed on the client after we checksummed it - "
1872                       "likely false positive due to mmap IO (bug 11742)";
1873         else if (new_cksum == client_cksum)
1874                 msg = "changed in transit before arrival at OST";
1875         else
1876                 msg = "changed in transit AND doesn't match the original - "
1877                       "likely false positive due to mmap IO (bug 11742)";
1878
1879         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1880                            DFID " object "DOSTID" extent [%llu-%llu], original "
1881                            "client csum %x (type %x), server csum %x (type %x),"
1882                            " client csum now %x\n",
1883                            obd_name, msg, libcfs_nid2str(peer->nid),
1884                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1885                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1886                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1887                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1888                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1889                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1890                            client_cksum,
1891                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1892                            server_cksum, cksum_type, new_cksum);
1893         return 1;
1894 }
1895
1896 /* Note rc enters this function as number of bytes transferred */
1897 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1898 {
1899         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1900         struct client_obd *cli = aa->aa_cli;
1901         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1902         const struct lnet_process_id *peer =
1903                 &req->rq_import->imp_connection->c_peer;
1904         struct ost_body *body;
1905         u32 client_cksum = 0;
1906         struct inode *inode;
1907
1908         ENTRY;
1909
1910         if (rc < 0 && rc != -EDQUOT) {
1911                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1912                 RETURN(rc);
1913         }
1914
1915         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1916         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1917         if (body == NULL) {
1918                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1919                 RETURN(-EPROTO);
1920         }
1921
1922         /* set/clear over quota flag for a uid/gid/projid */
1923         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1924             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1925                 unsigned qid[LL_MAXQUOTAS] = {
1926                                          body->oa.o_uid, body->oa.o_gid,
1927                                          body->oa.o_projid };
1928                 CDEBUG(D_QUOTA,
1929                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1930                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1931                        body->oa.o_valid, body->oa.o_flags);
1932                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1933                                        body->oa.o_flags);
1934         }
1935
1936         osc_update_grant(cli, body);
1937
1938         if (rc < 0)
1939                 RETURN(rc);
1940
1941         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1942                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1943
1944         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1945                 if (rc > 0) {
1946                         CERROR("%s: unexpected positive size %d\n",
1947                                obd_name, rc);
1948                         RETURN(-EPROTO);
1949                 }
1950
1951                 if (req->rq_bulk != NULL &&
1952                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1953                         RETURN(-EAGAIN);
1954
1955                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1956                     check_write_checksum(&body->oa, peer, client_cksum,
1957                                          body->oa.o_cksum, aa))
1958                         RETURN(-EAGAIN);
1959
1960                 rc = check_write_rcs(req, aa->aa_requested_nob,
1961                                      aa->aa_nio_count, aa->aa_page_count,
1962                                      aa->aa_ppga);
1963                 GOTO(out, rc);
1964         }
1965
1966         /* The rest of this function executes only for OST_READs */
1967
1968         if (req->rq_bulk == NULL) {
1969                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1970                                           RCL_SERVER);
1971                 LASSERT(rc == req->rq_status);
1972         } else {
1973                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1974                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1975         }
1976         if (rc < 0)
1977                 GOTO(out, rc = -EAGAIN);
1978
1979         if (rc > aa->aa_requested_nob) {
1980                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1981                        rc, aa->aa_requested_nob);
1982                 RETURN(-EPROTO);
1983         }
1984
1985         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1986                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1987                        rc, req->rq_bulk->bd_nob_transferred);
1988                 RETURN(-EPROTO);
1989         }
1990
1991         if (req->rq_bulk == NULL) {
1992                 /* short io */
1993                 int nob, pg_count, i = 0;
1994                 unsigned char *buf;
1995
1996                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1997                 pg_count = aa->aa_page_count;
1998                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1999                                                    rc);
2000                 nob = rc;
2001                 while (nob > 0 && pg_count > 0) {
2002                         unsigned char *ptr;
2003                         int count = aa->aa_ppga[i]->count > nob ?
2004                                     nob : aa->aa_ppga[i]->count;
2005
2006                         CDEBUG(D_CACHE, "page %p count %d\n",
2007                                aa->aa_ppga[i]->pg, count);
2008                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2009                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2010                                count);
2011                         kunmap_atomic((void *) ptr);
2012
2013                         buf += count;
2014                         nob -= count;
2015                         i++;
2016                         pg_count--;
2017                 }
2018         }
2019
2020         if (rc < aa->aa_requested_nob)
2021                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2022
2023         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2024                 static int cksum_counter;
2025                 u32        server_cksum = body->oa.o_cksum;
2026                 char      *via = "";
2027                 char      *router = "";
2028                 enum cksum_types cksum_type;
2029                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2030                         body->oa.o_flags : 0;
2031
2032                 cksum_type = obd_cksum_type_unpack(o_flags);
2033                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2034                                           aa->aa_page_count, aa->aa_ppga,
2035                                           OST_READ, &client_cksum);
2036                 if (rc < 0)
2037                         GOTO(out, rc);
2038
2039                 if (req->rq_bulk != NULL &&
2040                     peer->nid != req->rq_bulk->bd_sender) {
2041                         via = " via ";
2042                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2043                 }
2044
2045                 if (server_cksum != client_cksum) {
2046                         struct ost_body *clbody;
2047                         u32 page_count = aa->aa_page_count;
2048
2049                         clbody = req_capsule_client_get(&req->rq_pill,
2050                                                         &RMF_OST_BODY);
2051                         if (cli->cl_checksum_dump)
2052                                 dump_all_bulk_pages(&clbody->oa, page_count,
2053                                                     aa->aa_ppga, server_cksum,
2054                                                     client_cksum);
2055
2056                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2057                                            "%s%s%s inode "DFID" object "DOSTID
2058                                            " extent [%llu-%llu], client %x, "
2059                                            "server %x, cksum_type %x\n",
2060                                            obd_name,
2061                                            libcfs_nid2str(peer->nid),
2062                                            via, router,
2063                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2064                                                 clbody->oa.o_parent_seq : 0ULL,
2065                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2066                                                 clbody->oa.o_parent_oid : 0,
2067                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2068                                                 clbody->oa.o_parent_ver : 0,
2069                                            POSTID(&body->oa.o_oi),
2070                                            aa->aa_ppga[0]->off,
2071                                            aa->aa_ppga[page_count-1]->off +
2072                                            aa->aa_ppga[page_count-1]->count - 1,
2073                                            client_cksum, server_cksum,
2074                                            cksum_type);
2075                         cksum_counter = 0;
2076                         aa->aa_oa->o_cksum = client_cksum;
2077                         rc = -EAGAIN;
2078                 } else {
2079                         cksum_counter++;
2080                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2081                         rc = 0;
2082                 }
2083         } else if (unlikely(client_cksum)) {
2084                 static int cksum_missed;
2085
2086                 cksum_missed++;
2087                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2088                         CERROR("%s: checksum %u requested from %s but not sent\n",
2089                                obd_name, cksum_missed,
2090                                libcfs_nid2str(peer->nid));
2091         } else {
2092                 rc = 0;
2093         }
2094
2095         inode = page2inode(aa->aa_ppga[0]->pg);
2096         if (inode && IS_ENCRYPTED(inode)) {
2097                 int idx;
2098
2099                 if (!llcrypt_has_encryption_key(inode)) {
2100                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2101                         GOTO(out, rc);
2102                 }
2103                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2104                         struct brw_page *pg = aa->aa_ppga[idx];
2105                         unsigned int offs = 0;
2106
2107                         while (offs < PAGE_SIZE) {
2108                                 /* do not decrypt if page is all 0s */
2109                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2110                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2111                                         /* if page is empty forward info to
2112                                          * upper layers (ll_io_zero_page) by
2113                                          * clearing PagePrivate2
2114                                          */
2115                                         if (!offs)
2116                                                 ClearPagePrivate2(pg->pg);
2117                                         break;
2118                                 }
2119
2120                                 /* The page is already locked when we arrive here,
2121                                  * except when we deal with a twisted page for
2122                                  * specific Direct IO support, in which case
2123                                  * PageChecked flag is set on page.
2124                                  */
2125                                 if (PageChecked(pg->pg))
2126                                         lock_page(pg->pg);
2127                                 rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2128                                                     LUSTRE_ENCRYPTION_UNIT_SIZE,
2129                                                                       offs);
2130                                 if (PageChecked(pg->pg))
2131                                         unlock_page(pg->pg);
2132                                 if (rc)
2133                                         GOTO(out, rc);
2134
2135                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2136                         }
2137                 }
2138         }
2139
2140 out:
2141         if (rc >= 0)
2142                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2143                                      aa->aa_oa, &body->oa);
2144
2145         RETURN(rc);
2146 }
2147
2148 static int osc_brw_redo_request(struct ptlrpc_request *request,
2149                                 struct osc_brw_async_args *aa, int rc)
2150 {
2151         struct ptlrpc_request *new_req;
2152         struct osc_brw_async_args *new_aa;
2153         struct osc_async_page *oap;
2154         ENTRY;
2155
2156         /* The below message is checked in replay-ost-single.sh test_8ae*/
2157         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2158                   "redo for recoverable error %d", rc);
2159
2160         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2161                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2162                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2163                                   aa->aa_ppga, &new_req, 1);
2164         if (rc)
2165                 RETURN(rc);
2166
2167         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2168                 if (oap->oap_request != NULL) {
2169                         LASSERTF(request == oap->oap_request,
2170                                  "request %p != oap_request %p\n",
2171                                  request, oap->oap_request);
2172                 }
2173         }
2174         /*
2175          * New request takes over pga and oaps from old request.
2176          * Note that copying a list_head doesn't work, need to move it...
2177          */
2178         aa->aa_resends++;
2179         new_req->rq_interpret_reply = request->rq_interpret_reply;
2180         new_req->rq_async_args = request->rq_async_args;
2181         new_req->rq_commit_cb = request->rq_commit_cb;
2182         /* cap resend delay to the current request timeout, this is similar to
2183          * what ptlrpc does (see after_reply()) */
2184         if (aa->aa_resends > new_req->rq_timeout)
2185                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2186         else
2187                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2188         new_req->rq_generation_set = 1;
2189         new_req->rq_import_generation = request->rq_import_generation;
2190
2191         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2192
2193         INIT_LIST_HEAD(&new_aa->aa_oaps);
2194         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2195         INIT_LIST_HEAD(&new_aa->aa_exts);
2196         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2197         new_aa->aa_resends = aa->aa_resends;
2198
2199         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2200                 if (oap->oap_request) {
2201                         ptlrpc_req_finished(oap->oap_request);
2202                         oap->oap_request = ptlrpc_request_addref(new_req);
2203                 }
2204         }
2205
2206         /* XXX: This code will run into problem if we're going to support
2207          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2208          * and wait for all of them to be finished. We should inherit request
2209          * set from old request. */
2210         ptlrpcd_add_req(new_req);
2211
2212         DEBUG_REQ(D_INFO, new_req, "new request");
2213         RETURN(0);
2214 }
2215
2216 /*
2217  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2218  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2219  * fine for our small page arrays and doesn't require allocation.  its an
2220  * insertion sort that swaps elements that are strides apart, shrinking the
2221  * stride down until its '1' and the array is sorted.
2222  */
2223 static void sort_brw_pages(struct brw_page **array, int num)
2224 {
2225         int stride, i, j;
2226         struct brw_page *tmp;
2227
2228         if (num == 1)
2229                 return;
2230         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2231                 ;
2232
2233         do {
2234                 stride /= 3;
2235                 for (i = stride ; i < num ; i++) {
2236                         tmp = array[i];
2237                         j = i;
2238                         while (j >= stride && array[j - stride]->off > tmp->off) {
2239                                 array[j] = array[j - stride];
2240                                 j -= stride;
2241                         }
2242                         array[j] = tmp;
2243                 }
2244         } while (stride > 1);
2245 }
2246
2247 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2248 {
2249         LASSERT(ppga != NULL);
2250         OBD_FREE_PTR_ARRAY(ppga, count);
2251 }
2252
2253 static int brw_interpret(const struct lu_env *env,
2254                          struct ptlrpc_request *req, void *args, int rc)
2255 {
2256         struct osc_brw_async_args *aa = args;
2257         struct osc_extent *ext;
2258         struct osc_extent *tmp;
2259         struct client_obd *cli = aa->aa_cli;
2260         unsigned long transferred = 0;
2261
2262         ENTRY;
2263
2264         rc = osc_brw_fini_request(req, rc);
2265         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2266
2267         /* restore clear text pages */
2268         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2269
2270         /*
2271          * When server returns -EINPROGRESS, client should always retry
2272          * regardless of the number of times the bulk was resent already.
2273          */
2274         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2275                 if (req->rq_import_generation !=
2276                     req->rq_import->imp_generation) {
2277                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2278                                ""DOSTID", rc = %d.\n",
2279                                req->rq_import->imp_obd->obd_name,
2280                                POSTID(&aa->aa_oa->o_oi), rc);
2281                 } else if (rc == -EINPROGRESS ||
2282                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2283                         rc = osc_brw_redo_request(req, aa, rc);
2284                 } else {
2285                         CERROR("%s: too many resent retries for object: "
2286                                "%llu:%llu, rc = %d.\n",
2287                                req->rq_import->imp_obd->obd_name,
2288                                POSTID(&aa->aa_oa->o_oi), rc);
2289                 }
2290
2291                 if (rc == 0)
2292                         RETURN(0);
2293                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2294                         rc = -EIO;
2295         }
2296
2297         if (rc == 0) {
2298                 struct obdo *oa = aa->aa_oa;
2299                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2300                 unsigned long valid = 0;
2301                 struct cl_object *obj;
2302                 struct osc_async_page *last;
2303
2304                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2305                 obj = osc2cl(last->oap_obj);
2306
2307                 cl_object_attr_lock(obj);
2308                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2309                         attr->cat_blocks = oa->o_blocks;
2310                         valid |= CAT_BLOCKS;
2311                 }
2312                 if (oa->o_valid & OBD_MD_FLMTIME) {
2313                         attr->cat_mtime = oa->o_mtime;
2314                         valid |= CAT_MTIME;
2315                 }
2316                 if (oa->o_valid & OBD_MD_FLATIME) {
2317                         attr->cat_atime = oa->o_atime;
2318                         valid |= CAT_ATIME;
2319                 }
2320                 if (oa->o_valid & OBD_MD_FLCTIME) {
2321                         attr->cat_ctime = oa->o_ctime;
2322                         valid |= CAT_CTIME;
2323                 }
2324
2325                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2326                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2327                         loff_t last_off = last->oap_count + last->oap_obj_off +
2328                                 last->oap_page_off;
2329
2330                         /* Change file size if this is an out of quota or
2331                          * direct IO write and it extends the file size */
2332                         if (loi->loi_lvb.lvb_size < last_off) {
2333                                 attr->cat_size = last_off;
2334                                 valid |= CAT_SIZE;
2335                         }
2336                         /* Extend KMS if it's not a lockless write */
2337                         if (loi->loi_kms < last_off &&
2338                             oap2osc_page(last)->ops_srvlock == 0) {
2339                                 attr->cat_kms = last_off;
2340                                 valid |= CAT_KMS;
2341                         }
2342                 }
2343
2344                 if (valid != 0)
2345                         cl_object_attr_update(env, obj, attr, valid);
2346                 cl_object_attr_unlock(obj);
2347         }
2348         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2349         aa->aa_oa = NULL;
2350
2351         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2352                 osc_inc_unstable_pages(req);
2353
2354         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2355                 list_del_init(&ext->oe_link);
2356                 osc_extent_finish(env, ext, 1,
2357                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2358         }
2359         LASSERT(list_empty(&aa->aa_exts));
2360         LASSERT(list_empty(&aa->aa_oaps));
2361
2362         transferred = (req->rq_bulk == NULL ? /* short io */
2363                        aa->aa_requested_nob :
2364                        req->rq_bulk->bd_nob_transferred);
2365
2366         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2367         ptlrpc_lprocfs_brw(req, transferred);
2368
2369         spin_lock(&cli->cl_loi_list_lock);
2370         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2371          * is called so we know whether to go to sync BRWs or wait for more
2372          * RPCs to complete */
2373         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2374                 cli->cl_w_in_flight--;
2375         else
2376                 cli->cl_r_in_flight--;
2377         osc_wake_cache_waiters(cli);
2378         spin_unlock(&cli->cl_loi_list_lock);
2379
2380         osc_io_unplug(env, cli, NULL);
2381         RETURN(rc);
2382 }
2383
2384 static void brw_commit(struct ptlrpc_request *req)
2385 {
2386         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2387          * this called via the rq_commit_cb, I need to ensure
2388          * osc_dec_unstable_pages is still called. Otherwise unstable
2389          * pages may be leaked. */
2390         spin_lock(&req->rq_lock);
2391         if (likely(req->rq_unstable)) {
2392                 req->rq_unstable = 0;
2393                 spin_unlock(&req->rq_lock);
2394
2395                 osc_dec_unstable_pages(req);
2396         } else {
2397                 req->rq_committed = 1;
2398                 spin_unlock(&req->rq_lock);
2399         }
2400 }
2401
2402 /**
2403  * Build an RPC by the list of extent @ext_list. The caller must ensure
2404  * that the total pages in this list are NOT over max pages per RPC.
2405  * Extents in the list must be in OES_RPC state.
2406  */
2407 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2408                   struct list_head *ext_list, int cmd)
2409 {
2410         struct ptlrpc_request           *req = NULL;
2411         struct osc_extent               *ext;
2412         struct brw_page                 **pga = NULL;
2413         struct osc_brw_async_args       *aa = NULL;
2414         struct obdo                     *oa = NULL;
2415         struct osc_async_page           *oap;
2416         struct osc_object               *obj = NULL;
2417         struct cl_req_attr              *crattr = NULL;
2418         loff_t                          starting_offset = OBD_OBJECT_EOF;
2419         loff_t                          ending_offset = 0;
2420         /* '1' for consistency with code that checks !mpflag to restore */
2421         int mpflag = 1;
2422         int                             mem_tight = 0;
2423         int                             page_count = 0;
2424         bool                            soft_sync = false;
2425         bool                            ndelay = false;
2426         int                             i;
2427         int                             grant = 0;
2428         int                             rc;
2429         __u32                           layout_version = 0;
2430         LIST_HEAD(rpc_list);
2431         struct ost_body                 *body;
2432         ENTRY;
2433         LASSERT(!list_empty(ext_list));
2434
2435         /* add pages into rpc_list to build BRW rpc */
2436         list_for_each_entry(ext, ext_list, oe_link) {
2437                 LASSERT(ext->oe_state == OES_RPC);
2438                 mem_tight |= ext->oe_memalloc;
2439                 grant += ext->oe_grants;
2440                 page_count += ext->oe_nr_pages;
2441                 layout_version = max(layout_version, ext->oe_layout_version);
2442                 if (obj == NULL)
2443                         obj = ext->oe_obj;
2444         }
2445
2446         soft_sync = osc_over_unstable_soft_limit(cli);
2447         if (mem_tight)
2448                 mpflag = memalloc_noreclaim_save();
2449
2450         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2451         if (pga == NULL)
2452                 GOTO(out, rc = -ENOMEM);
2453
2454         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2455         if (oa == NULL)
2456                 GOTO(out, rc = -ENOMEM);
2457
2458         i = 0;
2459         list_for_each_entry(ext, ext_list, oe_link) {
2460                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2461                         if (mem_tight)
2462                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2463                         if (soft_sync)
2464                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2465                         pga[i] = &oap->oap_brw_page;
2466                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2467                         i++;
2468
2469                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2470                         if (starting_offset == OBD_OBJECT_EOF ||
2471                             starting_offset > oap->oap_obj_off)
2472                                 starting_offset = oap->oap_obj_off;
2473                         else
2474                                 LASSERT(oap->oap_page_off == 0);
2475                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2476                                 ending_offset = oap->oap_obj_off +
2477                                                 oap->oap_count;
2478                         else
2479                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2480                                         PAGE_SIZE);
2481                 }
2482                 if (ext->oe_ndelay)
2483                         ndelay = true;
2484         }
2485
2486         /* first page in the list */
2487         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2488
2489         crattr = &osc_env_info(env)->oti_req_attr;
2490         memset(crattr, 0, sizeof(*crattr));
2491         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2492         crattr->cra_flags = ~0ULL;
2493         crattr->cra_page = oap2cl_page(oap);
2494         crattr->cra_oa = oa;
2495         cl_req_attr_set(env, osc2cl(obj), crattr);
2496
2497         if (cmd == OBD_BRW_WRITE) {
2498                 oa->o_grant_used = grant;
2499                 if (layout_version > 0) {
2500                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2501                                PFID(&oa->o_oi.oi_fid), layout_version);
2502
2503                         oa->o_layout_version = layout_version;
2504                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2505                 }
2506         }
2507
2508         sort_brw_pages(pga, page_count);
2509         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2510         if (rc != 0) {
2511                 CERROR("prep_req failed: %d\n", rc);
2512                 GOTO(out, rc);
2513         }
2514
2515         req->rq_commit_cb = brw_commit;
2516         req->rq_interpret_reply = brw_interpret;
2517         req->rq_memalloc = mem_tight != 0;
2518         oap->oap_request = ptlrpc_request_addref(req);
2519         if (ndelay) {
2520                 req->rq_no_resend = req->rq_no_delay = 1;
2521                 /* probably set a shorter timeout value.
2522                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2523                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2524         }
2525
2526         /* Need to update the timestamps after the request is built in case
2527          * we race with setattr (locally or in queue at OST).  If OST gets
2528          * later setattr before earlier BRW (as determined by the request xid),
2529          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2530          * way to do this in a single call.  bug 10150 */
2531         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2532         crattr->cra_oa = &body->oa;
2533         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2534         cl_req_attr_set(env, osc2cl(obj), crattr);
2535         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2536
2537         aa = ptlrpc_req_async_args(aa, req);
2538         INIT_LIST_HEAD(&aa->aa_oaps);
2539         list_splice_init(&rpc_list, &aa->aa_oaps);
2540         INIT_LIST_HEAD(&aa->aa_exts);
2541         list_splice_init(ext_list, &aa->aa_exts);
2542
2543         spin_lock(&cli->cl_loi_list_lock);
2544         starting_offset >>= PAGE_SHIFT;
2545         if (cmd == OBD_BRW_READ) {
2546                 cli->cl_r_in_flight++;
2547                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2548                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2549                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2550                                       starting_offset + 1);
2551         } else {
2552                 cli->cl_w_in_flight++;
2553                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2554                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2555                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2556                                       starting_offset + 1);
2557         }
2558         spin_unlock(&cli->cl_loi_list_lock);
2559
2560         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2561                   page_count, aa, cli->cl_r_in_flight,
2562                   cli->cl_w_in_flight);
2563         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2564
2565         ptlrpcd_add_req(req);
2566         rc = 0;
2567         EXIT;
2568
2569 out:
2570         if (mem_tight)
2571                 memalloc_noreclaim_restore(mpflag);
2572
2573         if (rc != 0) {
2574                 LASSERT(req == NULL);
2575
2576                 if (oa)
2577                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2578                 if (pga) {
2579                         osc_release_bounce_pages(pga, page_count);
2580                         osc_release_ppga(pga, page_count);
2581                 }
2582                 /* this should happen rarely and is pretty bad, it makes the
2583                  * pending list not follow the dirty order */
2584                 while (!list_empty(ext_list)) {
2585                         ext = list_entry(ext_list->next, struct osc_extent,
2586                                          oe_link);
2587                         list_del_init(&ext->oe_link);
2588                         osc_extent_finish(env, ext, 0, rc);
2589                 }
2590         }
2591         RETURN(rc);
2592 }
2593
2594 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2595 {
2596         int set = 0;
2597
2598         LASSERT(lock != NULL);
2599
2600         lock_res_and_lock(lock);
2601
2602         if (lock->l_ast_data == NULL)
2603                 lock->l_ast_data = data;
2604         if (lock->l_ast_data == data)
2605                 set = 1;
2606
2607         unlock_res_and_lock(lock);
2608
2609         return set;
2610 }
2611
2612 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2613                      void *cookie, struct lustre_handle *lockh,
2614                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2615                      int errcode)
2616 {
2617         bool intent = *flags & LDLM_FL_HAS_INTENT;
2618         int rc;
2619         ENTRY;
2620
2621         /* The request was created before ldlm_cli_enqueue call. */
2622         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2623                 struct ldlm_reply *rep;
2624
2625                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2626                 LASSERT(rep != NULL);
2627
2628                 rep->lock_policy_res1 =
2629                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2630                 if (rep->lock_policy_res1)
2631                         errcode = rep->lock_policy_res1;
2632                 if (!speculative)
2633                         *flags |= LDLM_FL_LVB_READY;
2634         } else if (errcode == ELDLM_OK) {
2635                 *flags |= LDLM_FL_LVB_READY;
2636         }
2637
2638         /* Call the update callback. */
2639         rc = (*upcall)(cookie, lockh, errcode);
2640
2641         /* release the reference taken in ldlm_cli_enqueue() */
2642         if (errcode == ELDLM_LOCK_MATCHED)
2643                 errcode = ELDLM_OK;
2644         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2645                 ldlm_lock_decref(lockh, mode);
2646
2647         RETURN(rc);
2648 }
2649
2650 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2651                           void *args, int rc)
2652 {
2653         struct osc_enqueue_args *aa = args;
2654         struct ldlm_lock *lock;
2655         struct lustre_handle *lockh = &aa->oa_lockh;
2656         enum ldlm_mode mode = aa->oa_mode;
2657         struct ost_lvb *lvb = aa->oa_lvb;
2658         __u32 lvb_len = sizeof(*lvb);
2659         __u64 flags = 0;
2660
2661         ENTRY;
2662
2663         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2664          * be valid. */
2665         lock = ldlm_handle2lock(lockh);
2666         LASSERTF(lock != NULL,
2667                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2668                  lockh->cookie, req, aa);
2669
2670         /* Take an additional reference so that a blocking AST that
2671          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2672          * to arrive after an upcall has been executed by
2673          * osc_enqueue_fini(). */
2674         ldlm_lock_addref(lockh, mode);
2675
2676         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2677         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2678
2679         /* Let CP AST to grant the lock first. */
2680         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2681
2682         if (aa->oa_speculative) {
2683                 LASSERT(aa->oa_lvb == NULL);
2684                 LASSERT(aa->oa_flags == NULL);
2685                 aa->oa_flags = &flags;
2686         }
2687
2688         /* Complete obtaining the lock procedure. */
2689         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2690                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2691                                    lockh, rc);
2692         /* Complete osc stuff. */
2693         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2694                               aa->oa_flags, aa->oa_speculative, rc);
2695
2696         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2697
2698         ldlm_lock_decref(lockh, mode);
2699         LDLM_LOCK_PUT(lock);
2700         RETURN(rc);
2701 }
2702
2703 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2704  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2705  * other synchronous requests, however keeping some locks and trying to obtain
2706  * others may take a considerable amount of time in a case of ost failure; and
2707  * when other sync requests do not get released lock from a client, the client
2708  * is evicted from the cluster -- such scenarious make the life difficult, so
2709  * release locks just after they are obtained. */
2710 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2711                      __u64 *flags, union ldlm_policy_data *policy,
2712                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2713                      void *cookie, struct ldlm_enqueue_info *einfo,
2714                      struct ptlrpc_request_set *rqset, int async,
2715                      bool speculative)
2716 {
2717         struct obd_device *obd = exp->exp_obd;
2718         struct lustre_handle lockh = { 0 };
2719         struct ptlrpc_request *req = NULL;
2720         int intent = *flags & LDLM_FL_HAS_INTENT;
2721         __u64 match_flags = *flags;
2722         enum ldlm_mode mode;
2723         int rc;
2724         ENTRY;
2725
2726         /* Filesystem lock extents are extended to page boundaries so that
2727          * dealing with the page cache is a little smoother.  */
2728         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2729         policy->l_extent.end |= ~PAGE_MASK;
2730
2731         /* Next, search for already existing extent locks that will cover us */
2732         /* If we're trying to read, we also search for an existing PW lock.  The
2733          * VFS and page cache already protect us locally, so lots of readers/
2734          * writers can share a single PW lock.
2735          *
2736          * There are problems with conversion deadlocks, so instead of
2737          * converting a read lock to a write lock, we'll just enqueue a new
2738          * one.
2739          *
2740          * At some point we should cancel the read lock instead of making them
2741          * send us a blocking callback, but there are problems with canceling
2742          * locks out from other users right now, too. */
2743         mode = einfo->ei_mode;
2744         if (einfo->ei_mode == LCK_PR)
2745                 mode |= LCK_PW;
2746         /* Normal lock requests must wait for the LVB to be ready before
2747          * matching a lock; speculative lock requests do not need to,
2748          * because they will not actually use the lock. */
2749         if (!speculative)
2750                 match_flags |= LDLM_FL_LVB_READY;
2751         if (intent != 0)
2752                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2753         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2754                                einfo->ei_type, policy, mode, &lockh);
2755         if (mode) {
2756                 struct ldlm_lock *matched;
2757
2758                 if (*flags & LDLM_FL_TEST_LOCK)
2759                         RETURN(ELDLM_OK);
2760
2761                 matched = ldlm_handle2lock(&lockh);
2762                 if (speculative) {
2763                         /* This DLM lock request is speculative, and does not
2764                          * have an associated IO request. Therefore if there
2765                          * is already a DLM lock, it wll just inform the
2766                          * caller to cancel the request for this stripe.*/
2767                         lock_res_and_lock(matched);
2768                         if (ldlm_extent_equal(&policy->l_extent,
2769                             &matched->l_policy_data.l_extent))
2770                                 rc = -EEXIST;
2771                         else
2772                                 rc = -ECANCELED;
2773                         unlock_res_and_lock(matched);
2774
2775                         ldlm_lock_decref(&lockh, mode);
2776                         LDLM_LOCK_PUT(matched);
2777                         RETURN(rc);
2778                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2779                         *flags |= LDLM_FL_LVB_READY;
2780
2781                         /* We already have a lock, and it's referenced. */
2782                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2783
2784                         ldlm_lock_decref(&lockh, mode);
2785                         LDLM_LOCK_PUT(matched);
2786                         RETURN(ELDLM_OK);
2787                 } else {
2788                         ldlm_lock_decref(&lockh, mode);
2789                         LDLM_LOCK_PUT(matched);
2790                 }
2791         }
2792
2793         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2794                 RETURN(-ENOLCK);
2795
2796         if (intent) {
2797                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2798                                            &RQF_LDLM_ENQUEUE_LVB);
2799                 if (req == NULL)
2800                         RETURN(-ENOMEM);
2801
2802                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2803                 if (rc) {
2804                         ptlrpc_request_free(req);
2805                         RETURN(rc);
2806                 }
2807
2808                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2809                                      sizeof *lvb);
2810                 ptlrpc_request_set_replen(req);
2811         }
2812
2813         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2814         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2815
2816         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2817                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2818         if (async) {
2819                 if (!rc) {
2820                         struct osc_enqueue_args *aa;
2821                         aa = ptlrpc_req_async_args(aa, req);
2822                         aa->oa_exp         = exp;
2823                         aa->oa_mode        = einfo->ei_mode;
2824                         aa->oa_type        = einfo->ei_type;
2825                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2826                         aa->oa_upcall      = upcall;
2827                         aa->oa_cookie      = cookie;
2828                         aa->oa_speculative = speculative;
2829                         if (!speculative) {
2830                                 aa->oa_flags  = flags;
2831                                 aa->oa_lvb    = lvb;
2832                         } else {
2833                                 /* speculative locks are essentially to enqueue
2834                                  * a DLM lock  in advance, so we don't care
2835                                  * about the result of the enqueue. */
2836                                 aa->oa_lvb    = NULL;
2837                                 aa->oa_flags  = NULL;
2838                         }
2839
2840                         req->rq_interpret_reply = osc_enqueue_interpret;
2841                         ptlrpc_set_add_req(rqset, req);
2842                 } else if (intent) {
2843                         ptlrpc_req_finished(req);
2844                 }
2845                 RETURN(rc);
2846         }
2847
2848         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2849                               flags, speculative, rc);
2850         if (intent)
2851                 ptlrpc_req_finished(req);
2852
2853         RETURN(rc);
2854 }
2855
2856 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2857                    struct ldlm_res_id *res_id, enum ldlm_type type,
2858                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2859                    __u64 *flags, struct osc_object *obj,
2860                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2861 {
2862         struct obd_device *obd = exp->exp_obd;
2863         __u64 lflags = *flags;
2864         enum ldlm_mode rc;
2865         ENTRY;
2866
2867         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2868                 RETURN(-EIO);
2869
2870         /* Filesystem lock extents are extended to page boundaries so that
2871          * dealing with the page cache is a little smoother */
2872         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2873         policy->l_extent.end |= ~PAGE_MASK;
2874
2875         /* Next, search for already existing extent locks that will cover us */
2876         /* If we're trying to read, we also search for an existing PW lock.  The
2877          * VFS and page cache already protect us locally, so lots of readers/
2878          * writers can share a single PW lock. */
2879         rc = mode;
2880         if (mode == LCK_PR)
2881                 rc |= LCK_PW;
2882
2883         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2884                                         res_id, type, policy, rc, lockh,
2885                                         match_flags);
2886         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2887                 RETURN(rc);
2888
2889         if (obj != NULL) {
2890                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2891
2892                 LASSERT(lock != NULL);
2893                 if (osc_set_lock_data(lock, obj)) {
2894                         lock_res_and_lock(lock);
2895                         if (!ldlm_is_lvb_cached(lock)) {
2896                                 LASSERT(lock->l_ast_data == obj);
2897                                 osc_lock_lvb_update(env, obj, lock, NULL);
2898                                 ldlm_set_lvb_cached(lock);
2899                         }
2900                         unlock_res_and_lock(lock);
2901                 } else {
2902                         ldlm_lock_decref(lockh, rc);
2903                         rc = 0;
2904                 }
2905                 LDLM_LOCK_PUT(lock);
2906         }
2907         RETURN(rc);
2908 }
2909
2910 static int osc_statfs_interpret(const struct lu_env *env,
2911                                 struct ptlrpc_request *req, void *args, int rc)
2912 {
2913         struct osc_async_args *aa = args;
2914         struct obd_statfs *msfs;
2915
2916         ENTRY;
2917         if (rc == -EBADR)
2918                 /*
2919                  * The request has in fact never been sent due to issues at
2920                  * a higher level (LOV).  Exit immediately since the caller
2921                  * is aware of the problem and takes care of the clean up.
2922                  */
2923                 RETURN(rc);
2924
2925         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2926             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2927                 GOTO(out, rc = 0);
2928
2929         if (rc != 0)
2930                 GOTO(out, rc);
2931
2932         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2933         if (msfs == NULL)
2934                 GOTO(out, rc = -EPROTO);
2935
2936         *aa->aa_oi->oi_osfs = *msfs;
2937 out:
2938         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2939
2940         RETURN(rc);
2941 }
2942
2943 static int osc_statfs_async(struct obd_export *exp,
2944                             struct obd_info *oinfo, time64_t max_age,
2945                             struct ptlrpc_request_set *rqset)
2946 {
2947         struct obd_device     *obd = class_exp2obd(exp);
2948         struct ptlrpc_request *req;
2949         struct osc_async_args *aa;
2950         int rc;
2951         ENTRY;
2952
2953         if (obd->obd_osfs_age >= max_age) {
2954                 CDEBUG(D_SUPER,
2955                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2956                        obd->obd_name, &obd->obd_osfs,
2957                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2958                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2959                 spin_lock(&obd->obd_osfs_lock);
2960                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2961                 spin_unlock(&obd->obd_osfs_lock);
2962                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2963                 if (oinfo->oi_cb_up)
2964                         oinfo->oi_cb_up(oinfo, 0);
2965
2966                 RETURN(0);
2967         }
2968
2969         /* We could possibly pass max_age in the request (as an absolute
2970          * timestamp or a "seconds.usec ago") so the target can avoid doing
2971          * extra calls into the filesystem if that isn't necessary (e.g.
2972          * during mount that would help a bit).  Having relative timestamps
2973          * is not so great if request processing is slow, while absolute
2974          * timestamps are not ideal because they need time synchronization. */
2975         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2976         if (req == NULL)
2977                 RETURN(-ENOMEM);
2978
2979         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2980         if (rc) {
2981                 ptlrpc_request_free(req);
2982                 RETURN(rc);
2983         }
2984         ptlrpc_request_set_replen(req);
2985         req->rq_request_portal = OST_CREATE_PORTAL;
2986         ptlrpc_at_set_req_timeout(req);
2987
2988         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2989                 /* procfs requests not want stat in wait for avoid deadlock */
2990                 req->rq_no_resend = 1;
2991                 req->rq_no_delay = 1;
2992         }
2993
2994         req->rq_interpret_reply = osc_statfs_interpret;
2995         aa = ptlrpc_req_async_args(aa, req);
2996         aa->aa_oi = oinfo;
2997
2998         ptlrpc_set_add_req(rqset, req);
2999         RETURN(0);
3000 }
3001
3002 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3003                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3004 {
3005         struct obd_device     *obd = class_exp2obd(exp);
3006         struct obd_statfs     *msfs;
3007         struct ptlrpc_request *req;
3008         struct obd_import     *imp = NULL;
3009         int rc;
3010         ENTRY;
3011
3012
3013         /*Since the request might also come from lprocfs, so we need
3014          *sync this with client_disconnect_export Bug15684*/
3015         down_read(&obd->u.cli.cl_sem);
3016         if (obd->u.cli.cl_import)
3017                 imp = class_import_get(obd->u.cli.cl_import);
3018         up_read(&obd->u.cli.cl_sem);
3019         if (!imp)
3020                 RETURN(-ENODEV);
3021
3022         /* We could possibly pass max_age in the request (as an absolute
3023          * timestamp or a "seconds.usec ago") so the target can avoid doing
3024          * extra calls into the filesystem if that isn't necessary (e.g.
3025          * during mount that would help a bit).  Having relative timestamps
3026          * is not so great if request processing is slow, while absolute
3027          * timestamps are not ideal because they need time synchronization. */
3028         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3029
3030         class_import_put(imp);
3031
3032         if (req == NULL)
3033                 RETURN(-ENOMEM);
3034
3035         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3036         if (rc) {
3037                 ptlrpc_request_free(req);
3038                 RETURN(rc);
3039         }
3040         ptlrpc_request_set_replen(req);
3041         req->rq_request_portal = OST_CREATE_PORTAL;
3042         ptlrpc_at_set_req_timeout(req);
3043
3044         if (flags & OBD_STATFS_NODELAY) {
3045                 /* procfs requests not want stat in wait for avoid deadlock */
3046                 req->rq_no_resend = 1;
3047                 req->rq_no_delay = 1;
3048         }
3049
3050         rc = ptlrpc_queue_wait(req);
3051         if (rc)
3052                 GOTO(out, rc);
3053
3054         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3055         if (msfs == NULL)
3056                 GOTO(out, rc = -EPROTO);
3057
3058         *osfs = *msfs;
3059
3060         EXIT;
3061 out:
3062         ptlrpc_req_finished(req);
3063         return rc;
3064 }
3065
3066 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3067                          void *karg, void __user *uarg)
3068 {
3069         struct obd_device *obd = exp->exp_obd;
3070         struct obd_ioctl_data *data = karg;
3071         int rc = 0;
3072
3073         ENTRY;
3074         if (!try_module_get(THIS_MODULE)) {
3075                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3076                        module_name(THIS_MODULE));
3077                 return -EINVAL;
3078         }
3079         switch (cmd) {
3080         case OBD_IOC_CLIENT_RECOVER:
3081                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3082                                            data->ioc_inlbuf1, 0);
3083                 if (rc > 0)
3084                         rc = 0;
3085                 break;
3086         case IOC_OSC_SET_ACTIVE:
3087                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3088                                               data->ioc_offset);
3089                 break;
3090         default:
3091                 rc = -ENOTTY;
3092                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3093                        obd->obd_name, cmd, current->comm, rc);
3094                 break;
3095         }
3096
3097         module_put(THIS_MODULE);
3098         return rc;
3099 }
3100
3101 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3102                        u32 keylen, void *key, u32 vallen, void *val,
3103                        struct ptlrpc_request_set *set)
3104 {
3105         struct ptlrpc_request *req;
3106         struct obd_device     *obd = exp->exp_obd;
3107         struct obd_import     *imp = class_exp2cliimp(exp);
3108         char                  *tmp;
3109         int                    rc;
3110         ENTRY;
3111
3112         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114         if (KEY_IS(KEY_CHECKSUM)) {
3115                 if (vallen != sizeof(int))
3116                         RETURN(-EINVAL);
3117                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118                 RETURN(0);
3119         }
3120
3121         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122                 sptlrpc_conf_client_adapt(obd);
3123                 RETURN(0);
3124         }
3125
3126         if (KEY_IS(KEY_FLUSH_CTX)) {
3127                 sptlrpc_import_flush_my_ctx(imp);
3128                 RETURN(0);
3129         }
3130
3131         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3132                 struct client_obd *cli = &obd->u.cli;
3133                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3134                 long target = *(long *)val;
3135
3136                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3137                 *(long *)val -= nr;
3138                 RETURN(0);
3139         }
3140
3141         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3142                 RETURN(-EINVAL);
3143
3144         /* We pass all other commands directly to OST. Since nobody calls osc
3145            methods directly and everybody is supposed to go through LOV, we
3146            assume lov checked invalid values for us.
3147            The only recognised values so far are evict_by_nid and mds_conn.
3148            Even if something bad goes through, we'd get a -EINVAL from OST
3149            anyway. */
3150
3151         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3152                                                 &RQF_OST_SET_GRANT_INFO :
3153                                                 &RQF_OBD_SET_INFO);
3154         if (req == NULL)
3155                 RETURN(-ENOMEM);
3156
3157         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3158                              RCL_CLIENT, keylen);
3159         if (!KEY_IS(KEY_GRANT_SHRINK))
3160                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3161                                      RCL_CLIENT, vallen);
3162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3163         if (rc) {
3164                 ptlrpc_request_free(req);
3165                 RETURN(rc);
3166         }
3167
3168         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3169         memcpy(tmp, key, keylen);
3170         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3171                                                         &RMF_OST_BODY :
3172                                                         &RMF_SETINFO_VAL);
3173         memcpy(tmp, val, vallen);
3174
3175         if (KEY_IS(KEY_GRANT_SHRINK)) {
3176                 struct osc_grant_args *aa;
3177                 struct obdo *oa;
3178
3179                 aa = ptlrpc_req_async_args(aa, req);
3180                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3181                 if (!oa) {
3182                         ptlrpc_req_finished(req);
3183                         RETURN(-ENOMEM);
3184                 }
3185                 *oa = ((struct ost_body *)val)->oa;
3186                 aa->aa_oa = oa;
3187                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3188         }
3189
3190         ptlrpc_request_set_replen(req);
3191         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3192                 LASSERT(set != NULL);
3193                 ptlrpc_set_add_req(set, req);
3194                 ptlrpc_check_set(NULL, set);
3195         } else {
3196                 ptlrpcd_add_req(req);
3197         }
3198
3199         RETURN(0);
3200 }
3201 EXPORT_SYMBOL(osc_set_info_async);
3202
3203 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3204                   struct obd_device *obd, struct obd_uuid *cluuid,
3205                   struct obd_connect_data *data, void *localdata)
3206 {
3207         struct client_obd *cli = &obd->u.cli;
3208
3209         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3210                 long lost_grant;
3211                 long grant;
3212
3213                 spin_lock(&cli->cl_loi_list_lock);
3214                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3215                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3216                         /* restore ocd_grant_blkbits as client page bits */
3217                         data->ocd_grant_blkbits = PAGE_SHIFT;
3218                         grant += cli->cl_dirty_grant;
3219                 } else {
3220                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3221                 }
3222                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3223                 lost_grant = cli->cl_lost_grant;
3224                 cli->cl_lost_grant = 0;
3225                 spin_unlock(&cli->cl_loi_list_lock);
3226
3227                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3228                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3229                        data->ocd_version, data->ocd_grant, lost_grant);
3230         }
3231
3232         RETURN(0);
3233 }
3234 EXPORT_SYMBOL(osc_reconnect);
3235
3236 int osc_disconnect(struct obd_export *exp)
3237 {
3238         struct obd_device *obd = class_exp2obd(exp);
3239         int rc;
3240
3241         rc = client_disconnect_export(exp);
3242         /**
3243          * Initially we put del_shrink_grant before disconnect_export, but it
3244          * causes the following problem if setup (connect) and cleanup
3245          * (disconnect) are tangled together.
3246          *      connect p1                     disconnect p2
3247          *   ptlrpc_connect_import
3248          *     ...............               class_manual_cleanup
3249          *                                     osc_disconnect
3250          *                                     del_shrink_grant
3251          *   ptlrpc_connect_interrupt
3252          *     osc_init_grant
3253          *   add this client to shrink list
3254          *                                      cleanup_osc
3255          * Bang! grant shrink thread trigger the shrink. BUG18662
3256          */
3257         osc_del_grant_list(&obd->u.cli);
3258         return rc;
3259 }
3260 EXPORT_SYMBOL(osc_disconnect);
3261
3262 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3263                                  struct hlist_node *hnode, void *arg)
3264 {
3265         struct lu_env *env = arg;
3266         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3267         struct ldlm_lock *lock;
3268         struct osc_object *osc = NULL;
3269         ENTRY;
3270
3271         lock_res(res);
3272         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3273                 if (lock->l_ast_data != NULL && osc == NULL) {
3274                         osc = lock->l_ast_data;
3275                         cl_object_get(osc2cl(osc));
3276                 }
3277
3278                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3279                  * by the 2nd round of ldlm_namespace_clean() call in
3280                  * osc_import_event(). */
3281                 ldlm_clear_cleaned(lock);
3282         }
3283         unlock_res(res);
3284
3285         if (osc != NULL) {
3286                 osc_object_invalidate(env, osc);
3287                 cl_object_put(env, osc2cl(osc));
3288         }
3289
3290         RETURN(0);
3291 }
3292 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3293
3294 static int osc_import_event(struct obd_device *obd,
3295                             struct obd_import *imp,
3296                             enum obd_import_event event)
3297 {
3298         struct client_obd *cli;
3299         int rc = 0;
3300
3301         ENTRY;
3302         LASSERT(imp->imp_obd == obd);
3303
3304         switch (event) {
3305         case IMP_EVENT_DISCON: {
3306                 cli = &obd->u.cli;
3307                 spin_lock(&cli->cl_loi_list_lock);
3308                 cli->cl_avail_grant = 0;
3309                 cli->cl_lost_grant = 0;
3310                 spin_unlock(&cli->cl_loi_list_lock);
3311                 break;
3312         }
3313         case IMP_EVENT_INACTIVE: {
3314                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3315                 break;
3316         }
3317         case IMP_EVENT_INVALIDATE: {
3318                 struct ldlm_namespace *ns = obd->obd_namespace;
3319                 struct lu_env         *env;
3320                 __u16                  refcheck;
3321
3322                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3323
3324                 env = cl_env_get(&refcheck);
3325                 if (!IS_ERR(env)) {
3326                         osc_io_unplug(env, &obd->u.cli, NULL);
3327
3328                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3329                                                  osc_ldlm_resource_invalidate,
3330                                                  env, 0);
3331                         cl_env_put(env, &refcheck);
3332
3333                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3334                 } else
3335                         rc = PTR_ERR(env);
3336                 break;
3337         }
3338         case IMP_EVENT_ACTIVE: {
3339                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3340                 break;
3341         }
3342         case IMP_EVENT_OCD: {
3343                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3344
3345                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3346                         osc_init_grant(&obd->u.cli, ocd);
3347
3348                 /* See bug 7198 */
3349                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3350                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3351
3352                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3353                 break;
3354         }
3355         case IMP_EVENT_DEACTIVATE: {
3356                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3357                 break;
3358         }
3359         case IMP_EVENT_ACTIVATE: {
3360                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3361                 break;
3362         }
3363         default:
3364                 CERROR("Unknown import event %d\n", event);
3365                 LBUG();
3366         }
3367         RETURN(rc);
3368 }
3369
3370 /**
3371  * Determine whether the lock can be canceled before replaying the lock
3372  * during recovery, see bug16774 for detailed information.
3373  *
3374  * \retval zero the lock can't be canceled
3375  * \retval other ok to cancel
3376  */
3377 static int osc_cancel_weight(struct ldlm_lock *lock)
3378 {
3379         /*
3380          * Cancel all unused and granted extent lock.
3381          */
3382         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3383             ldlm_is_granted(lock) &&
3384             osc_ldlm_weigh_ast(lock) == 0)
3385                 RETURN(1);
3386
3387         RETURN(0);
3388 }
3389
3390 static int brw_queue_work(const struct lu_env *env, void *data)
3391 {
3392         struct client_obd *cli = data;
3393
3394         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3395
3396         osc_io_unplug(env, cli, NULL);
3397         RETURN(0);
3398 }
3399
3400 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3401 {
3402         struct client_obd *cli = &obd->u.cli;
3403         void *handler;
3404         int rc;
3405
3406         ENTRY;
3407
3408         rc = ptlrpcd_addref();
3409         if (rc)
3410                 RETURN(rc);
3411
3412         rc = client_obd_setup(obd, lcfg);
3413         if (rc)
3414                 GOTO(out_ptlrpcd, rc);
3415
3416
3417         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3418         if (IS_ERR(handler))
3419                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3420         cli->cl_writeback_work = handler;
3421
3422         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3423         if (IS_ERR(handler))
3424                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3425         cli->cl_lru_work = handler;
3426
3427         rc = osc_quota_setup(obd);
3428         if (rc)
3429                 GOTO(out_ptlrpcd_work, rc);
3430
3431         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3432         osc_update_next_shrink(cli);
3433
3434         RETURN(rc);
3435
3436 out_ptlrpcd_work:
3437         if (cli->cl_writeback_work != NULL) {
3438                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3439                 cli->cl_writeback_work = NULL;
3440         }
3441         if (cli->cl_lru_work != NULL) {
3442                 ptlrpcd_destroy_work(cli->cl_lru_work);
3443                 cli->cl_lru_work = NULL;
3444         }
3445         client_obd_cleanup(obd);
3446 out_ptlrpcd:
3447         ptlrpcd_decref();
3448         RETURN(rc);
3449 }
3450 EXPORT_SYMBOL(osc_setup_common);
3451
3452 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3453 {
3454         struct client_obd *cli = &obd->u.cli;
3455         int                adding;
3456         int                added;
3457         int                req_count;
3458         int                rc;
3459
3460         ENTRY;
3461
3462         rc = osc_setup_common(obd, lcfg);
3463         if (rc < 0)
3464                 RETURN(rc);
3465
3466         rc = osc_tunables_init(obd);
3467         if (rc)
3468                 RETURN(rc);
3469
3470         /*
3471          * We try to control the total number of requests with a upper limit
3472          * osc_reqpool_maxreqcount. There might be some race which will cause
3473          * over-limit allocation, but it is fine.
3474          */
3475         req_count = atomic_read(&osc_pool_req_count);
3476         if (req_count < osc_reqpool_maxreqcount) {
3477                 adding = cli->cl_max_rpcs_in_flight + 2;
3478                 if (req_count + adding > osc_reqpool_maxreqcount)
3479                         adding = osc_reqpool_maxreqcount - req_count;
3480
3481                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3482                 atomic_add(added, &osc_pool_req_count);
3483         }
3484
3485         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3486
3487         spin_lock(&osc_shrink_lock);
3488         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3489         spin_unlock(&osc_shrink_lock);
3490         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3491         cli->cl_import->imp_idle_debug = D_HA;
3492
3493         RETURN(0);
3494 }
3495
3496 int osc_precleanup_common(struct obd_device *obd)
3497 {
3498         struct client_obd *cli = &obd->u.cli;
3499         ENTRY;
3500
3501         /* LU-464
3502          * for echo client, export may be on zombie list, wait for
3503          * zombie thread to cull it, because cli.cl_import will be
3504          * cleared in client_disconnect_export():
3505          *   class_export_destroy() -> obd_cleanup() ->
3506          *   echo_device_free() -> echo_client_cleanup() ->
3507          *   obd_disconnect() -> osc_disconnect() ->
3508          *   client_disconnect_export()
3509          */
3510         obd_zombie_barrier();
3511         if (cli->cl_writeback_work) {
3512                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3513                 cli->cl_writeback_work = NULL;
3514         }
3515
3516         if (cli->cl_lru_work) {
3517                 ptlrpcd_destroy_work(cli->cl_lru_work);
3518                 cli->cl_lru_work = NULL;
3519         }
3520
3521         obd_cleanup_client_import(obd);
3522         RETURN(0);
3523 }
3524 EXPORT_SYMBOL(osc_precleanup_common);
3525
3526 static int osc_precleanup(struct obd_device *obd)
3527 {
3528         ENTRY;
3529
3530         osc_precleanup_common(obd);
3531
3532         ptlrpc_lprocfs_unregister_obd(obd);
3533         RETURN(0);
3534 }
3535
3536 int osc_cleanup_common(struct obd_device *obd)
3537 {
3538         struct client_obd *cli = &obd->u.cli;
3539         int rc;
3540
3541         ENTRY;
3542
3543         spin_lock(&osc_shrink_lock);
3544         list_del(&cli->cl_shrink_list);
3545         spin_unlock(&osc_shrink_lock);
3546
3547         /* lru cleanup */
3548         if (cli->cl_cache != NULL) {
3549                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3550                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3551                 list_del_init(&cli->cl_lru_osc);
3552                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3553                 cli->cl_lru_left = NULL;
3554                 cl_cache_decref(cli->cl_cache);
3555                 cli->cl_cache = NULL;
3556         }
3557
3558         /* free memory of osc quota cache */
3559         osc_quota_cleanup(obd);
3560
3561         rc = client_obd_cleanup(obd);
3562
3563         ptlrpcd_decref();
3564         RETURN(rc);
3565 }
3566 EXPORT_SYMBOL(osc_cleanup_common);
3567
3568 static const struct obd_ops osc_obd_ops = {
3569         .o_owner                = THIS_MODULE,
3570         .o_setup                = osc_setup,
3571         .o_precleanup           = osc_precleanup,
3572         .o_cleanup              = osc_cleanup_common,
3573         .o_add_conn             = client_import_add_conn,
3574         .o_del_conn             = client_import_del_conn,
3575         .o_connect              = client_connect_import,
3576         .o_reconnect            = osc_reconnect,
3577         .o_disconnect           = osc_disconnect,
3578         .o_statfs               = osc_statfs,
3579         .o_statfs_async         = osc_statfs_async,
3580         .o_create               = osc_create,
3581         .o_destroy              = osc_destroy,
3582         .o_getattr              = osc_getattr,
3583         .o_setattr              = osc_setattr,
3584         .o_iocontrol            = osc_iocontrol,
3585         .o_set_info_async       = osc_set_info_async,
3586         .o_import_event         = osc_import_event,
3587         .o_quotactl             = osc_quotactl,
3588 };
3589
3590 static struct shrinker *osc_cache_shrinker;
3591 LIST_HEAD(osc_shrink_list);
3592 DEFINE_SPINLOCK(osc_shrink_lock);
3593
3594 #ifndef HAVE_SHRINKER_COUNT
3595 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3596 {
3597         struct shrink_control scv = {
3598                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3599                 .gfp_mask   = shrink_param(sc, gfp_mask)
3600         };
3601         (void)osc_cache_shrink_scan(shrinker, &scv);
3602
3603         return osc_cache_shrink_count(shrinker, &scv);
3604 }
3605 #endif
3606
3607 static int __init osc_init(void)
3608 {
3609         unsigned int reqpool_size;
3610         unsigned int reqsize;
3611         int rc;
3612         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3613                          osc_cache_shrink_count, osc_cache_shrink_scan);
3614         ENTRY;
3615
3616         /* print an address of _any_ initialized kernel symbol from this
3617          * module, to allow debugging with gdb that doesn't support data
3618          * symbols from modules.*/
3619         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3620
3621         rc = lu_kmem_init(osc_caches);
3622         if (rc)
3623                 RETURN(rc);
3624
3625         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3626                                  LUSTRE_OSC_NAME, &osc_device_type);
3627         if (rc)
3628                 GOTO(out_kmem, rc);
3629
3630         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3631
3632         /* This is obviously too much memory, only prevent overflow here */
3633         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3634                 GOTO(out_type, rc = -EINVAL);
3635
3636         reqpool_size = osc_reqpool_mem_max << 20;
3637
3638         reqsize = 1;
3639         while (reqsize < OST_IO_MAXREQSIZE)
3640                 reqsize = reqsize << 1;
3641
3642         /*
3643          * We don't enlarge the request count in OSC pool according to
3644          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3645          * tried after normal allocation failed. So a small OSC pool won't
3646          * cause much performance degression in most of cases.
3647          */
3648         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3649
3650         atomic_set(&osc_pool_req_count, 0);
3651         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3652                                           ptlrpc_add_rqs_to_pool);
3653
3654         if (osc_rq_pool == NULL)
3655                 GOTO(out_type, rc = -ENOMEM);
3656
3657         rc = osc_start_grant_work();
3658         if (rc != 0)
3659                 GOTO(out_req_pool, rc);
3660
3661         RETURN(rc);
3662
3663 out_req_pool:
3664         ptlrpc_free_rq_pool(osc_rq_pool);
3665 out_type:
3666         class_unregister_type(LUSTRE_OSC_NAME);
3667 out_kmem:
3668         lu_kmem_fini(osc_caches);
3669
3670         RETURN(rc);
3671 }
3672
3673 static void __exit osc_exit(void)
3674 {
3675         osc_stop_grant_work();
3676         remove_shrinker(osc_cache_shrinker);
3677         class_unregister_type(LUSTRE_OSC_NAME);
3678         lu_kmem_fini(osc_caches);
3679         ptlrpc_free_rq_pool(osc_rq_pool);
3680 }
3681
3682 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3683 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3684 MODULE_VERSION(LUSTRE_VERSION_STRING);
3685 MODULE_LICENSE("GPL");
3686
3687 module_init(osc_init);
3688 module_exit(osc_exit);