Whamcloud - gitweb
LU-14306 sec: get rid of bad rss-counter state messages
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
751         if (cli->cl_lost_grant > INT_MAX) {
752                 CDEBUG(D_CACHE,
753                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
754                       cli_name(cli), cli->cl_lost_grant);
755                 oa->o_dropped = INT_MAX;
756         } else {
757                 oa->o_dropped = cli->cl_lost_grant;
758         }
759         cli->cl_lost_grant -= oa->o_dropped;
760         spin_unlock(&cli->cl_loi_list_lock);
761         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
762                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
763                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
764 }
765
766 void osc_update_next_shrink(struct client_obd *cli)
767 {
768         cli->cl_next_shrink_grant = ktime_get_seconds() +
769                                     cli->cl_grant_shrink_interval;
770
771         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
772                cli->cl_next_shrink_grant);
773 }
774
775 static void __osc_update_grant(struct client_obd *cli, u64 grant)
776 {
777         spin_lock(&cli->cl_loi_list_lock);
778         cli->cl_avail_grant += grant;
779         spin_unlock(&cli->cl_loi_list_lock);
780 }
781
782 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
783 {
784         if (body->oa.o_valid & OBD_MD_FLGRANT) {
785                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
786                 __osc_update_grant(cli, body->oa.o_grant);
787         }
788 }
789
790 /**
791  * grant thread data for shrinking space.
792  */
793 struct grant_thread_data {
794         struct list_head        gtd_clients;
795         struct mutex            gtd_mutex;
796         unsigned long           gtd_stopped:1;
797 };
798 static struct grant_thread_data client_gtd;
799
800 static int osc_shrink_grant_interpret(const struct lu_env *env,
801                                       struct ptlrpc_request *req,
802                                       void *args, int rc)
803 {
804         struct osc_grant_args *aa = args;
805         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
806         struct ost_body *body;
807
808         if (rc != 0) {
809                 __osc_update_grant(cli, aa->aa_oa->o_grant);
810                 GOTO(out, rc);
811         }
812
813         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
814         LASSERT(body);
815         osc_update_grant(cli, body);
816 out:
817         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
818         aa->aa_oa = NULL;
819
820         return rc;
821 }
822
823 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
824 {
825         spin_lock(&cli->cl_loi_list_lock);
826         oa->o_grant = cli->cl_avail_grant / 4;
827         cli->cl_avail_grant -= oa->o_grant;
828         spin_unlock(&cli->cl_loi_list_lock);
829         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
830                 oa->o_valid |= OBD_MD_FLFLAGS;
831                 oa->o_flags = 0;
832         }
833         oa->o_flags |= OBD_FL_SHRINK_GRANT;
834         osc_update_next_shrink(cli);
835 }
836
837 /* Shrink the current grant, either from some large amount to enough for a
838  * full set of in-flight RPCs, or if we have already shrunk to that limit
839  * then to enough for a single RPC.  This avoids keeping more grant than
840  * needed, and avoids shrinking the grant piecemeal. */
841 static int osc_shrink_grant(struct client_obd *cli)
842 {
843         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
844                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
845
846         spin_lock(&cli->cl_loi_list_lock);
847         if (cli->cl_avail_grant <= target_bytes)
848                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
849         spin_unlock(&cli->cl_loi_list_lock);
850
851         return osc_shrink_grant_to_target(cli, target_bytes);
852 }
853
854 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
855 {
856         int                     rc = 0;
857         struct ost_body        *body;
858         ENTRY;
859
860         spin_lock(&cli->cl_loi_list_lock);
861         /* Don't shrink if we are already above or below the desired limit
862          * We don't want to shrink below a single RPC, as that will negatively
863          * impact block allocation and long-term performance. */
864         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
865                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
866
867         if (target_bytes >= cli->cl_avail_grant) {
868                 spin_unlock(&cli->cl_loi_list_lock);
869                 RETURN(0);
870         }
871         spin_unlock(&cli->cl_loi_list_lock);
872
873         OBD_ALLOC_PTR(body);
874         if (!body)
875                 RETURN(-ENOMEM);
876
877         osc_announce_cached(cli, &body->oa, 0);
878
879         spin_lock(&cli->cl_loi_list_lock);
880         if (target_bytes >= cli->cl_avail_grant) {
881                 /* available grant has changed since target calculation */
882                 spin_unlock(&cli->cl_loi_list_lock);
883                 GOTO(out_free, rc = 0);
884         }
885         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
886         cli->cl_avail_grant = target_bytes;
887         spin_unlock(&cli->cl_loi_list_lock);
888         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
889                 body->oa.o_valid |= OBD_MD_FLFLAGS;
890                 body->oa.o_flags = 0;
891         }
892         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
893         osc_update_next_shrink(cli);
894
895         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
896                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
897                                 sizeof(*body), body, NULL);
898         if (rc != 0)
899                 __osc_update_grant(cli, body->oa.o_grant);
900 out_free:
901         OBD_FREE_PTR(body);
902         RETURN(rc);
903 }
904
905 static int osc_should_shrink_grant(struct client_obd *client)
906 {
907         time64_t next_shrink = client->cl_next_shrink_grant;
908
909         if (client->cl_import == NULL)
910                 return 0;
911
912         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
913             client->cl_import->imp_grant_shrink_disabled) {
914                 osc_update_next_shrink(client);
915                 return 0;
916         }
917
918         if (ktime_get_seconds() >= next_shrink - 5) {
919                 /* Get the current RPC size directly, instead of going via:
920                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
921                  * Keep comment here so that it can be found by searching. */
922                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
923
924                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
925                     client->cl_avail_grant > brw_size)
926                         return 1;
927                 else
928                         osc_update_next_shrink(client);
929         }
930         return 0;
931 }
932
933 #define GRANT_SHRINK_RPC_BATCH  100
934
935 static struct delayed_work work;
936
937 static void osc_grant_work_handler(struct work_struct *data)
938 {
939         struct client_obd *cli;
940         int rpc_sent;
941         bool init_next_shrink = true;
942         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
943
944         rpc_sent = 0;
945         mutex_lock(&client_gtd.gtd_mutex);
946         list_for_each_entry(cli, &client_gtd.gtd_clients,
947                             cl_grant_chain) {
948                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
949                     osc_should_shrink_grant(cli)) {
950                         osc_shrink_grant(cli);
951                         rpc_sent++;
952                 }
953
954                 if (!init_next_shrink) {
955                         if (cli->cl_next_shrink_grant < next_shrink &&
956                             cli->cl_next_shrink_grant > ktime_get_seconds())
957                                 next_shrink = cli->cl_next_shrink_grant;
958                 } else {
959                         init_next_shrink = false;
960                         next_shrink = cli->cl_next_shrink_grant;
961                 }
962         }
963         mutex_unlock(&client_gtd.gtd_mutex);
964
965         if (client_gtd.gtd_stopped == 1)
966                 return;
967
968         if (next_shrink > ktime_get_seconds()) {
969                 time64_t delay = next_shrink - ktime_get_seconds();
970
971                 schedule_delayed_work(&work, cfs_time_seconds(delay));
972         } else {
973                 schedule_work(&work.work);
974         }
975 }
976
977 void osc_schedule_grant_work(void)
978 {
979         cancel_delayed_work_sync(&work);
980         schedule_work(&work.work);
981 }
982
983 /**
984  * Start grant thread for returing grant to server for idle clients.
985  */
986 static int osc_start_grant_work(void)
987 {
988         client_gtd.gtd_stopped = 0;
989         mutex_init(&client_gtd.gtd_mutex);
990         INIT_LIST_HEAD(&client_gtd.gtd_clients);
991
992         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
993         schedule_work(&work.work);
994
995         return 0;
996 }
997
998 static void osc_stop_grant_work(void)
999 {
1000         client_gtd.gtd_stopped = 1;
1001         cancel_delayed_work_sync(&work);
1002 }
1003
1004 static void osc_add_grant_list(struct client_obd *client)
1005 {
1006         mutex_lock(&client_gtd.gtd_mutex);
1007         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1008         mutex_unlock(&client_gtd.gtd_mutex);
1009 }
1010
1011 static void osc_del_grant_list(struct client_obd *client)
1012 {
1013         if (list_empty(&client->cl_grant_chain))
1014                 return;
1015
1016         mutex_lock(&client_gtd.gtd_mutex);
1017         list_del_init(&client->cl_grant_chain);
1018         mutex_unlock(&client_gtd.gtd_mutex);
1019 }
1020
1021 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022 {
1023         /*
1024          * ocd_grant is the total grant amount we're expect to hold: if we've
1025          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1026          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1027          * dirty.
1028          *
1029          * race is tolerable here: if we're evicted, but imp_state already
1030          * left EVICTED state, then cl_dirty_pages must be 0 already.
1031          */
1032         spin_lock(&cli->cl_loi_list_lock);
1033         cli->cl_avail_grant = ocd->ocd_grant;
1034         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1035                 unsigned long consumed = cli->cl_reserved_grant;
1036
1037                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1038                         consumed += cli->cl_dirty_grant;
1039                 else
1040                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1041                 if (cli->cl_avail_grant < consumed) {
1042                         CERROR("%s: granted %ld but already consumed %ld\n",
1043                                cli_name(cli), cli->cl_avail_grant, consumed);
1044                         cli->cl_avail_grant = 0;
1045                 } else {
1046                         cli->cl_avail_grant -= consumed;
1047                 }
1048         }
1049
1050         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1051                 u64 size;
1052                 int chunk_mask;
1053
1054                 /* overhead for each extent insertion */
1055                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1056                 /* determine the appropriate chunk size used by osc_extent. */
1057                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1058                                           ocd->ocd_grant_blkbits);
1059                 /* max_pages_per_rpc must be chunk aligned */
1060                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1061                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1062                                              ~chunk_mask) & chunk_mask;
1063                 /* determine maximum extent size, in #pages */
1064                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1065                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1066                 if (cli->cl_max_extent_pages == 0)
1067                         cli->cl_max_extent_pages = 1;
1068         } else {
1069                 cli->cl_grant_extent_tax = 0;
1070                 cli->cl_chunkbits = PAGE_SHIFT;
1071                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1072         }
1073         spin_unlock(&cli->cl_loi_list_lock);
1074
1075         CDEBUG(D_CACHE,
1076                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1077                cli_name(cli),
1078                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1079                cli->cl_max_extent_pages);
1080
1081         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1082                 osc_add_grant_list(cli);
1083 }
1084 EXPORT_SYMBOL(osc_init_grant);
1085
1086 /* We assume that the reason this OSC got a short read is because it read
1087  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1088  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1089  * this stripe never got written at or beyond this stripe offset yet. */
1090 static void handle_short_read(int nob_read, size_t page_count,
1091                               struct brw_page **pga)
1092 {
1093         char *ptr;
1094         int i = 0;
1095
1096         /* skip bytes read OK */
1097         while (nob_read > 0) {
1098                 LASSERT (page_count > 0);
1099
1100                 if (pga[i]->count > nob_read) {
1101                         /* EOF inside this page */
1102                         ptr = kmap(pga[i]->pg) +
1103                                 (pga[i]->off & ~PAGE_MASK);
1104                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1105                         kunmap(pga[i]->pg);
1106                         page_count--;
1107                         i++;
1108                         break;
1109                 }
1110
1111                 nob_read -= pga[i]->count;
1112                 page_count--;
1113                 i++;
1114         }
1115
1116         /* zero remaining pages */
1117         while (page_count-- > 0) {
1118                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1119                 memset(ptr, 0, pga[i]->count);
1120                 kunmap(pga[i]->pg);
1121                 i++;
1122         }
1123 }
1124
1125 static int check_write_rcs(struct ptlrpc_request *req,
1126                            int requested_nob, int niocount,
1127                            size_t page_count, struct brw_page **pga)
1128 {
1129         int     i;
1130         __u32   *remote_rcs;
1131
1132         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1133                                                   sizeof(*remote_rcs) *
1134                                                   niocount);
1135         if (remote_rcs == NULL) {
1136                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1137                 return(-EPROTO);
1138         }
1139
1140         /* return error if any niobuf was in error */
1141         for (i = 0; i < niocount; i++) {
1142                 if ((int)remote_rcs[i] < 0) {
1143                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1144                                i, remote_rcs[i], req);
1145                         return remote_rcs[i];
1146                 }
1147
1148                 if (remote_rcs[i] != 0) {
1149                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1150                                 i, remote_rcs[i], req);
1151                         return(-EPROTO);
1152                 }
1153         }
1154         if (req->rq_bulk != NULL &&
1155             req->rq_bulk->bd_nob_transferred != requested_nob) {
1156                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1157                        req->rq_bulk->bd_nob_transferred, requested_nob);
1158                 return(-EPROTO);
1159         }
1160
1161         return (0);
1162 }
1163
1164 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1165 {
1166         if (p1->flag != p2->flag) {
1167                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1168                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1169                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1170
1171                 /* warn if we try to combine flags that we don't know to be
1172                  * safe to combine */
1173                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175                               "report this at https://jira.whamcloud.com/\n",
1176                               p1->flag, p2->flag);
1177                 }
1178                 return 0;
1179         }
1180
1181         return (p1->off + p1->count == p2->off);
1182 }
1183
1184 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1185 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1186                                    size_t pg_count, struct brw_page **pga,
1187                                    int opc, obd_dif_csum_fn *fn,
1188                                    int sector_size,
1189                                    u32 *check_sum)
1190 {
1191         struct ahash_request *req;
1192         /* Used Adler as the default checksum type on top of DIF tags */
1193         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1194         struct page *__page;
1195         unsigned char *buffer;
1196         __u16 *guard_start;
1197         unsigned int bufsize;
1198         int guard_number;
1199         int used_number = 0;
1200         int used;
1201         u32 cksum;
1202         int rc = 0;
1203         int i = 0;
1204
1205         LASSERT(pg_count > 0);
1206
1207         __page = alloc_page(GFP_KERNEL);
1208         if (__page == NULL)
1209                 return -ENOMEM;
1210
1211         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1212         if (IS_ERR(req)) {
1213                 rc = PTR_ERR(req);
1214                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1215                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1216                 GOTO(out, rc);
1217         }
1218
1219         buffer = kmap(__page);
1220         guard_start = (__u16 *)buffer;
1221         guard_number = PAGE_SIZE / sizeof(*guard_start);
1222         while (nob > 0 && pg_count > 0) {
1223                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1224
1225                 /* corrupt the data before we compute the checksum, to
1226                  * simulate an OST->client data error */
1227                 if (unlikely(i == 0 && opc == OST_READ &&
1228                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1229                         unsigned char *ptr = kmap(pga[i]->pg);
1230                         int off = pga[i]->off & ~PAGE_MASK;
1231
1232                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1233                         kunmap(pga[i]->pg);
1234                 }
1235
1236                 /*
1237                  * The left guard number should be able to hold checksums of a
1238                  * whole page
1239                  */
1240                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1241                                                   pga[i]->off & ~PAGE_MASK,
1242                                                   count,
1243                                                   guard_start + used_number,
1244                                                   guard_number - used_number,
1245                                                   &used, sector_size,
1246                                                   fn);
1247                 if (rc)
1248                         break;
1249
1250                 used_number += used;
1251                 if (used_number == guard_number) {
1252                         cfs_crypto_hash_update_page(req, __page, 0,
1253                                 used_number * sizeof(*guard_start));
1254                         used_number = 0;
1255                 }
1256
1257                 nob -= pga[i]->count;
1258                 pg_count--;
1259                 i++;
1260         }
1261         kunmap(__page);
1262         if (rc)
1263                 GOTO(out, rc);
1264
1265         if (used_number != 0)
1266                 cfs_crypto_hash_update_page(req, __page, 0,
1267                         used_number * sizeof(*guard_start));
1268
1269         bufsize = sizeof(cksum);
1270         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1271
1272         /* For sending we only compute the wrong checksum instead
1273          * of corrupting the data so it is still correct on a redo */
1274         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1275                 cksum++;
1276
1277         *check_sum = cksum;
1278 out:
1279         __free_page(__page);
1280         return rc;
1281 }
1282 #else /* !CONFIG_CRC_T10DIF */
1283 #define obd_dif_ip_fn NULL
1284 #define obd_dif_crc_fn NULL
1285 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1286         -EOPNOTSUPP
1287 #endif /* CONFIG_CRC_T10DIF */
1288
1289 static int osc_checksum_bulk(int nob, size_t pg_count,
1290                              struct brw_page **pga, int opc,
1291                              enum cksum_types cksum_type,
1292                              u32 *cksum)
1293 {
1294         int                             i = 0;
1295         struct ahash_request           *req;
1296         unsigned int                    bufsize;
1297         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1298
1299         LASSERT(pg_count > 0);
1300
1301         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1302         if (IS_ERR(req)) {
1303                 CERROR("Unable to initialize checksum hash %s\n",
1304                        cfs_crypto_hash_name(cfs_alg));
1305                 return PTR_ERR(req);
1306         }
1307
1308         while (nob > 0 && pg_count > 0) {
1309                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1310
1311                 /* corrupt the data before we compute the checksum, to
1312                  * simulate an OST->client data error */
1313                 if (i == 0 && opc == OST_READ &&
1314                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1315                         unsigned char *ptr = kmap(pga[i]->pg);
1316                         int off = pga[i]->off & ~PAGE_MASK;
1317
1318                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1319                         kunmap(pga[i]->pg);
1320                 }
1321                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1322                                             pga[i]->off & ~PAGE_MASK,
1323                                             count);
1324                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1325                                (int)(pga[i]->off & ~PAGE_MASK));
1326
1327                 nob -= pga[i]->count;
1328                 pg_count--;
1329                 i++;
1330         }
1331
1332         bufsize = sizeof(*cksum);
1333         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1334
1335         /* For sending we only compute the wrong checksum instead
1336          * of corrupting the data so it is still correct on a redo */
1337         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1338                 (*cksum)++;
1339
1340         return 0;
1341 }
1342
1343 static int osc_checksum_bulk_rw(const char *obd_name,
1344                                 enum cksum_types cksum_type,
1345                                 int nob, size_t pg_count,
1346                                 struct brw_page **pga, int opc,
1347                                 u32 *check_sum)
1348 {
1349         obd_dif_csum_fn *fn = NULL;
1350         int sector_size = 0;
1351         int rc;
1352
1353         ENTRY;
1354         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1355
1356         if (fn)
1357                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1358                                              opc, fn, sector_size, check_sum);
1359         else
1360                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1361                                        check_sum);
1362
1363         RETURN(rc);
1364 }
1365
1366 static inline void osc_release_bounce_pages(struct brw_page **pga,
1367                                             u32 page_count)
1368 {
1369 #ifdef HAVE_LUSTRE_CRYPTO
1370         int i;
1371
1372         for (i = 0; i < page_count; i++) {
1373                 /* Bounce pages allocated by a call to
1374                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1375                  * are identified thanks to the PageChecked flag.
1376                  */
1377                 if (PageChecked(pga[i]->pg))
1378                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1379                 pga[i]->count -= pga[i]->bp_count_diff;
1380                 pga[i]->off += pga[i]->bp_off_diff;
1381         }
1382 #endif
1383 }
1384
1385 static int
1386 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1387                      u32 page_count, struct brw_page **pga,
1388                      struct ptlrpc_request **reqp, int resend)
1389 {
1390         struct ptlrpc_request *req;
1391         struct ptlrpc_bulk_desc *desc;
1392         struct ost_body *body;
1393         struct obd_ioobj *ioobj;
1394         struct niobuf_remote *niobuf;
1395         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1396         struct osc_brw_async_args *aa;
1397         struct req_capsule *pill;
1398         struct brw_page *pg_prev;
1399         void *short_io_buf;
1400         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1401         struct inode *inode;
1402         bool directio = false;
1403
1404         ENTRY;
1405         inode = page2inode(pga[0]->pg);
1406         if (inode == NULL) {
1407                 /* Try to get reference to inode from cl_page if we are
1408                  * dealing with direct IO, as handled pages are not
1409                  * actual page cache pages.
1410                  */
1411                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1412                 struct cl_page *clpage = oap2cl_page(oap);
1413
1414                 inode = clpage->cp_inode;
1415                 if (inode)
1416                         directio = true;
1417         }
1418         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1419                 RETURN(-ENOMEM); /* Recoverable */
1420         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1421                 RETURN(-EINVAL); /* Fatal */
1422
1423         if ((cmd & OBD_BRW_WRITE) != 0) {
1424                 opc = OST_WRITE;
1425                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1426                                                 osc_rq_pool,
1427                                                 &RQF_OST_BRW_WRITE);
1428         } else {
1429                 opc = OST_READ;
1430                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1431         }
1432         if (req == NULL)
1433                 RETURN(-ENOMEM);
1434
1435         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1436                 for (i = 0; i < page_count; i++) {
1437                         struct brw_page *pg = pga[i];
1438                         struct page *data_page = NULL;
1439                         bool retried = false;
1440                         bool lockedbymyself;
1441                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1442                         struct address_space *map_orig = NULL;
1443                         pgoff_t index_orig;
1444
1445 retry_encrypt:
1446                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1447                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1448                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1449                         /* The page can already be locked when we arrive here.
1450                          * This is possible when cl_page_assume/vvp_page_assume
1451                          * is stuck on wait_on_page_writeback with page lock
1452                          * held. In this case there is no risk for the lock to
1453                          * be released while we are doing our encryption
1454                          * processing, because writeback against that page will
1455                          * end in vvp_page_completion_write/cl_page_completion,
1456                          * which means only once the page is fully processed.
1457                          */
1458                         lockedbymyself = trylock_page(pg->pg);
1459                         if (directio) {
1460                                 map_orig = pg->pg->mapping;
1461                                 pg->pg->mapping = inode->i_mapping;
1462                                 index_orig = pg->pg->index;
1463                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1464                         }
1465                         data_page =
1466                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1467                                                                  nunits, 0,
1468                                                                  GFP_NOFS);
1469                         if (directio) {
1470                                 pg->pg->mapping = map_orig;
1471                                 pg->pg->index = index_orig;
1472                         }
1473                         if (lockedbymyself)
1474                                 unlock_page(pg->pg);
1475                         if (IS_ERR(data_page)) {
1476                                 rc = PTR_ERR(data_page);
1477                                 if (rc == -ENOMEM && !retried) {
1478                                         retried = true;
1479                                         rc = 0;
1480                                         goto retry_encrypt;
1481                                 }
1482                                 ptlrpc_request_free(req);
1483                                 RETURN(rc);
1484                         }
1485                         /* Set PageChecked flag on bounce page for
1486                          * disambiguation in osc_release_bounce_pages().
1487                          */
1488                         SetPageChecked(data_page);
1489                         pg->pg = data_page;
1490                         /* there should be no gap in the middle of page array */
1491                         if (i == page_count - 1) {
1492                                 struct osc_async_page *oap = brw_page2oap(pg);
1493
1494                                 oa->o_size = oap->oap_count +
1495                                         oap->oap_obj_off + oap->oap_page_off;
1496                         }
1497                         /* len is forced to nunits, and relative offset to 0
1498                          * so store the old, clear text info
1499                          */
1500                         pg->bp_count_diff = nunits - pg->count;
1501                         pg->count = nunits;
1502                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1503                         pg->off = pg->off & PAGE_MASK;
1504                 }
1505         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1506                 for (i = 0; i < page_count; i++) {
1507                         struct brw_page *pg = pga[i];
1508                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1509
1510                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1511                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1512                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1513                         /* count/off are forced to cover the whole encryption
1514                          * unit size so that all encrypted data is stored on the
1515                          * OST, so adjust bp_{count,off}_diff for the size of
1516                          * the clear text.
1517                          */
1518                         pg->bp_count_diff = nunits - pg->count;
1519                         pg->count = nunits;
1520                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1521                         pg->off = pg->off & PAGE_MASK;
1522                 }
1523         }
1524
1525         for (niocount = i = 1; i < page_count; i++) {
1526                 if (!can_merge_pages(pga[i - 1], pga[i]))
1527                         niocount++;
1528         }
1529
1530         pill = &req->rq_pill;
1531         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1532                              sizeof(*ioobj));
1533         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1534                              niocount * sizeof(*niobuf));
1535
1536         for (i = 0; i < page_count; i++) {
1537                 short_io_size += pga[i]->count;
1538                 if (!inode || !IS_ENCRYPTED(inode)) {
1539                         pga[i]->bp_count_diff = 0;
1540                         pga[i]->bp_off_diff = 0;
1541                 }
1542         }
1543
1544         /* Check if read/write is small enough to be a short io. */
1545         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1546             !imp_connect_shortio(cli->cl_import))
1547                 short_io_size = 0;
1548
1549         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1550                              opc == OST_READ ? 0 : short_io_size);
1551         if (opc == OST_READ)
1552                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1553                                      short_io_size);
1554
1555         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1556         if (rc) {
1557                 ptlrpc_request_free(req);
1558                 RETURN(rc);
1559         }
1560         osc_set_io_portal(req);
1561
1562         ptlrpc_at_set_req_timeout(req);
1563         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1564          * retry logic */
1565         req->rq_no_retry_einprogress = 1;
1566
1567         if (short_io_size != 0) {
1568                 desc = NULL;
1569                 short_io_buf = NULL;
1570                 goto no_bulk;
1571         }
1572
1573         desc = ptlrpc_prep_bulk_imp(req, page_count,
1574                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1575                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1576                         PTLRPC_BULK_PUT_SINK),
1577                 OST_BULK_PORTAL,
1578                 &ptlrpc_bulk_kiov_pin_ops);
1579
1580         if (desc == NULL)
1581                 GOTO(out, rc = -ENOMEM);
1582         /* NB request now owns desc and will free it when it gets freed */
1583 no_bulk:
1584         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1585         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1586         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1587         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1588
1589         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1590
1591         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1592          * and from_kgid(), because they are asynchronous. Fortunately, variable
1593          * oa contains valid o_uid and o_gid in these two operations.
1594          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1595          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1596          * other process logic */
1597         body->oa.o_uid = oa->o_uid;
1598         body->oa.o_gid = oa->o_gid;
1599
1600         obdo_to_ioobj(oa, ioobj);
1601         ioobj->ioo_bufcnt = niocount;
1602         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1603          * that might be send for this request.  The actual number is decided
1604          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1605          * "max - 1" for old client compatibility sending "0", and also so the
1606          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1607         if (desc != NULL)
1608                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1609         else /* short io */
1610                 ioobj_max_brw_set(ioobj, 0);
1611
1612         if (short_io_size != 0) {
1613                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1614                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1615                         body->oa.o_flags = 0;
1616                 }
1617                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1618                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1619                        short_io_size);
1620                 if (opc == OST_WRITE) {
1621                         short_io_buf = req_capsule_client_get(pill,
1622                                                               &RMF_SHORT_IO);
1623                         LASSERT(short_io_buf != NULL);
1624                 }
1625         }
1626
1627         LASSERT(page_count > 0);
1628         pg_prev = pga[0];
1629         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1630                 struct brw_page *pg = pga[i];
1631                 int poff = pg->off & ~PAGE_MASK;
1632
1633                 LASSERT(pg->count > 0);
1634                 /* make sure there is no gap in the middle of page array */
1635                 LASSERTF(page_count == 1 ||
1636                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1637                           ergo(i > 0 && i < page_count - 1,
1638                                poff == 0 && pg->count == PAGE_SIZE)   &&
1639                           ergo(i == page_count - 1, poff == 0)),
1640                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1641                          i, page_count, pg, pg->off, pg->count);
1642                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1643                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1644                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1645                          i, page_count,
1646                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1647                          pg_prev->pg, page_private(pg_prev->pg),
1648                          pg_prev->pg->index, pg_prev->off);
1649                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1650                         (pg->flag & OBD_BRW_SRVLOCK));
1651                 if (short_io_size != 0 && opc == OST_WRITE) {
1652                         unsigned char *ptr = kmap_atomic(pg->pg);
1653
1654                         LASSERT(short_io_size >= requested_nob + pg->count);
1655                         memcpy(short_io_buf + requested_nob,
1656                                ptr + poff,
1657                                pg->count);
1658                         kunmap_atomic(ptr);
1659                 } else if (short_io_size == 0) {
1660                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1661                                                          pg->count);
1662                 }
1663                 requested_nob += pg->count;
1664
1665                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1666                         niobuf--;
1667                         niobuf->rnb_len += pg->count;
1668                 } else {
1669                         niobuf->rnb_offset = pg->off;
1670                         niobuf->rnb_len    = pg->count;
1671                         niobuf->rnb_flags  = pg->flag;
1672                 }
1673                 pg_prev = pg;
1674         }
1675
1676         LASSERTF((void *)(niobuf - niocount) ==
1677                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1678                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1679                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1680
1681         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1682         if (resend) {
1683                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1684                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1685                         body->oa.o_flags = 0;
1686                 }
1687                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1688         }
1689
1690         if (osc_should_shrink_grant(cli))
1691                 osc_shrink_grant_local(cli, &body->oa);
1692
1693         /* size[REQ_REC_OFF] still sizeof (*body) */
1694         if (opc == OST_WRITE) {
1695                 if (cli->cl_checksum &&
1696                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1697                         /* store cl_cksum_type in a local variable since
1698                          * it can be changed via lprocfs */
1699                         enum cksum_types cksum_type = cli->cl_cksum_type;
1700
1701                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1702                                 body->oa.o_flags = 0;
1703
1704                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1705                                                                 cksum_type);
1706                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1707
1708                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1709                                                   requested_nob, page_count,
1710                                                   pga, OST_WRITE,
1711                                                   &body->oa.o_cksum);
1712                         if (rc < 0) {
1713                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1714                                        rc);
1715                                 GOTO(out, rc);
1716                         }
1717                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1718                                body->oa.o_cksum);
1719
1720                         /* save this in 'oa', too, for later checking */
1721                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1722                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1723                                                            cksum_type);
1724                 } else {
1725                         /* clear out the checksum flag, in case this is a
1726                          * resend but cl_checksum is no longer set. b=11238 */
1727                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1728                 }
1729                 oa->o_cksum = body->oa.o_cksum;
1730                 /* 1 RC per niobuf */
1731                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1732                                      sizeof(__u32) * niocount);
1733         } else {
1734                 if (cli->cl_checksum &&
1735                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1736                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1737                                 body->oa.o_flags = 0;
1738                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1739                                 cli->cl_cksum_type);
1740                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1741                 }
1742
1743                 /* Client cksum has been already copied to wire obdo in previous
1744                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1745                  * resent due to cksum error, this will allow Server to
1746                  * check+dump pages on its side */
1747         }
1748         ptlrpc_request_set_replen(req);
1749
1750         aa = ptlrpc_req_async_args(aa, req);
1751         aa->aa_oa = oa;
1752         aa->aa_requested_nob = requested_nob;
1753         aa->aa_nio_count = niocount;
1754         aa->aa_page_count = page_count;
1755         aa->aa_resends = 0;
1756         aa->aa_ppga = pga;
1757         aa->aa_cli = cli;
1758         INIT_LIST_HEAD(&aa->aa_oaps);
1759
1760         *reqp = req;
1761         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1762         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1763                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1764                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1765         RETURN(0);
1766
1767  out:
1768         ptlrpc_req_finished(req);
1769         RETURN(rc);
1770 }
1771
1772 char dbgcksum_file_name[PATH_MAX];
1773
1774 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1775                                 struct brw_page **pga, __u32 server_cksum,
1776                                 __u32 client_cksum)
1777 {
1778         struct file *filp;
1779         int rc, i;
1780         unsigned int len;
1781         char *buf;
1782
1783         /* will only keep dump of pages on first error for the same range in
1784          * file/fid, not during the resends/retries. */
1785         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1786                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1787                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1788                   libcfs_debug_file_path_arr :
1789                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1790                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1791                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1792                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1793                  pga[0]->off,
1794                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1795                  client_cksum, server_cksum);
1796         filp = filp_open(dbgcksum_file_name,
1797                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1798         if (IS_ERR(filp)) {
1799                 rc = PTR_ERR(filp);
1800                 if (rc == -EEXIST)
1801                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1802                                "checksum error: rc = %d\n", dbgcksum_file_name,
1803                                rc);
1804                 else
1805                         CERROR("%s: can't open to dump pages with checksum "
1806                                "error: rc = %d\n", dbgcksum_file_name, rc);
1807                 return;
1808         }
1809
1810         for (i = 0; i < page_count; i++) {
1811                 len = pga[i]->count;
1812                 buf = kmap(pga[i]->pg);
1813                 while (len != 0) {
1814                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1815                         if (rc < 0) {
1816                                 CERROR("%s: wanted to write %u but got %d "
1817                                        "error\n", dbgcksum_file_name, len, rc);
1818                                 break;
1819                         }
1820                         len -= rc;
1821                         buf += rc;
1822                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1823                                dbgcksum_file_name, rc);
1824                 }
1825                 kunmap(pga[i]->pg);
1826         }
1827
1828         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1829         if (rc)
1830                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1831         filp_close(filp, NULL);
1832 }
1833
1834 static int
1835 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1836                      __u32 client_cksum, __u32 server_cksum,
1837                      struct osc_brw_async_args *aa)
1838 {
1839         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1840         enum cksum_types cksum_type;
1841         obd_dif_csum_fn *fn = NULL;
1842         int sector_size = 0;
1843         __u32 new_cksum;
1844         char *msg;
1845         int rc;
1846
1847         if (server_cksum == client_cksum) {
1848                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1849                 return 0;
1850         }
1851
1852         if (aa->aa_cli->cl_checksum_dump)
1853                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1854                                     server_cksum, client_cksum);
1855
1856         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1857                                            oa->o_flags : 0);
1858
1859         switch (cksum_type) {
1860         case OBD_CKSUM_T10IP512:
1861                 fn = obd_dif_ip_fn;
1862                 sector_size = 512;
1863                 break;
1864         case OBD_CKSUM_T10IP4K:
1865                 fn = obd_dif_ip_fn;
1866                 sector_size = 4096;
1867                 break;
1868         case OBD_CKSUM_T10CRC512:
1869                 fn = obd_dif_crc_fn;
1870                 sector_size = 512;
1871                 break;
1872         case OBD_CKSUM_T10CRC4K:
1873                 fn = obd_dif_crc_fn;
1874                 sector_size = 4096;
1875                 break;
1876         default:
1877                 break;
1878         }
1879
1880         if (fn)
1881                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1882                                              aa->aa_page_count, aa->aa_ppga,
1883                                              OST_WRITE, fn, sector_size,
1884                                              &new_cksum);
1885         else
1886                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1887                                        aa->aa_ppga, OST_WRITE, cksum_type,
1888                                        &new_cksum);
1889
1890         if (rc < 0)
1891                 msg = "failed to calculate the client write checksum";
1892         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1893                 msg = "the server did not use the checksum type specified in "
1894                       "the original request - likely a protocol problem";
1895         else if (new_cksum == server_cksum)
1896                 msg = "changed on the client after we checksummed it - "
1897                       "likely false positive due to mmap IO (bug 11742)";
1898         else if (new_cksum == client_cksum)
1899                 msg = "changed in transit before arrival at OST";
1900         else
1901                 msg = "changed in transit AND doesn't match the original - "
1902                       "likely false positive due to mmap IO (bug 11742)";
1903
1904         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1905                            DFID " object "DOSTID" extent [%llu-%llu], original "
1906                            "client csum %x (type %x), server csum %x (type %x),"
1907                            " client csum now %x\n",
1908                            obd_name, msg, libcfs_nid2str(peer->nid),
1909                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1910                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1911                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1912                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1913                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1914                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1915                            client_cksum,
1916                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1917                            server_cksum, cksum_type, new_cksum);
1918         return 1;
1919 }
1920
1921 /* Note rc enters this function as number of bytes transferred */
1922 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1923 {
1924         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1925         struct client_obd *cli = aa->aa_cli;
1926         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1927         const struct lnet_process_id *peer =
1928                 &req->rq_import->imp_connection->c_peer;
1929         struct ost_body *body;
1930         u32 client_cksum = 0;
1931         struct inode *inode;
1932         unsigned int blockbits = 0, blocksize = 0;
1933
1934         ENTRY;
1935
1936         if (rc < 0 && rc != -EDQUOT) {
1937                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1938                 RETURN(rc);
1939         }
1940
1941         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1942         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1943         if (body == NULL) {
1944                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1945                 RETURN(-EPROTO);
1946         }
1947
1948         /* set/clear over quota flag for a uid/gid/projid */
1949         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1950             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1951                 unsigned qid[LL_MAXQUOTAS] = {
1952                                          body->oa.o_uid, body->oa.o_gid,
1953                                          body->oa.o_projid };
1954                 CDEBUG(D_QUOTA,
1955                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1956                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1957                        body->oa.o_valid, body->oa.o_flags);
1958                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1959                                        body->oa.o_flags);
1960         }
1961
1962         osc_update_grant(cli, body);
1963
1964         if (rc < 0)
1965                 RETURN(rc);
1966
1967         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1968                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1969
1970         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1971                 if (rc > 0) {
1972                         CERROR("%s: unexpected positive size %d\n",
1973                                obd_name, rc);
1974                         RETURN(-EPROTO);
1975                 }
1976
1977                 if (req->rq_bulk != NULL &&
1978                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1979                         RETURN(-EAGAIN);
1980
1981                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1982                     check_write_checksum(&body->oa, peer, client_cksum,
1983                                          body->oa.o_cksum, aa))
1984                         RETURN(-EAGAIN);
1985
1986                 rc = check_write_rcs(req, aa->aa_requested_nob,
1987                                      aa->aa_nio_count, aa->aa_page_count,
1988                                      aa->aa_ppga);
1989                 GOTO(out, rc);
1990         }
1991
1992         /* The rest of this function executes only for OST_READs */
1993
1994         if (req->rq_bulk == NULL) {
1995                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1996                                           RCL_SERVER);
1997                 LASSERT(rc == req->rq_status);
1998         } else {
1999                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2000                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2001         }
2002         if (rc < 0)
2003                 GOTO(out, rc = -EAGAIN);
2004
2005         if (rc > aa->aa_requested_nob) {
2006                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2007                        rc, aa->aa_requested_nob);
2008                 RETURN(-EPROTO);
2009         }
2010
2011         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2012                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2013                        rc, req->rq_bulk->bd_nob_transferred);
2014                 RETURN(-EPROTO);
2015         }
2016
2017         if (req->rq_bulk == NULL) {
2018                 /* short io */
2019                 int nob, pg_count, i = 0;
2020                 unsigned char *buf;
2021
2022                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2023                 pg_count = aa->aa_page_count;
2024                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2025                                                    rc);
2026                 nob = rc;
2027                 while (nob > 0 && pg_count > 0) {
2028                         unsigned char *ptr;
2029                         int count = aa->aa_ppga[i]->count > nob ?
2030                                     nob : aa->aa_ppga[i]->count;
2031
2032                         CDEBUG(D_CACHE, "page %p count %d\n",
2033                                aa->aa_ppga[i]->pg, count);
2034                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2035                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2036                                count);
2037                         kunmap_atomic((void *) ptr);
2038
2039                         buf += count;
2040                         nob -= count;
2041                         i++;
2042                         pg_count--;
2043                 }
2044         }
2045
2046         if (rc < aa->aa_requested_nob)
2047                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2048
2049         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2050                 static int cksum_counter;
2051                 u32        server_cksum = body->oa.o_cksum;
2052                 char      *via = "";
2053                 char      *router = "";
2054                 enum cksum_types cksum_type;
2055                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2056                         body->oa.o_flags : 0;
2057
2058                 cksum_type = obd_cksum_type_unpack(o_flags);
2059                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2060                                           aa->aa_page_count, aa->aa_ppga,
2061                                           OST_READ, &client_cksum);
2062                 if (rc < 0)
2063                         GOTO(out, rc);
2064
2065                 if (req->rq_bulk != NULL &&
2066                     peer->nid != req->rq_bulk->bd_sender) {
2067                         via = " via ";
2068                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2069                 }
2070
2071                 if (server_cksum != client_cksum) {
2072                         struct ost_body *clbody;
2073                         u32 page_count = aa->aa_page_count;
2074
2075                         clbody = req_capsule_client_get(&req->rq_pill,
2076                                                         &RMF_OST_BODY);
2077                         if (cli->cl_checksum_dump)
2078                                 dump_all_bulk_pages(&clbody->oa, page_count,
2079                                                     aa->aa_ppga, server_cksum,
2080                                                     client_cksum);
2081
2082                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2083                                            "%s%s%s inode "DFID" object "DOSTID
2084                                            " extent [%llu-%llu], client %x, "
2085                                            "server %x, cksum_type %x\n",
2086                                            obd_name,
2087                                            libcfs_nid2str(peer->nid),
2088                                            via, router,
2089                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2090                                                 clbody->oa.o_parent_seq : 0ULL,
2091                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2092                                                 clbody->oa.o_parent_oid : 0,
2093                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2094                                                 clbody->oa.o_parent_ver : 0,
2095                                            POSTID(&body->oa.o_oi),
2096                                            aa->aa_ppga[0]->off,
2097                                            aa->aa_ppga[page_count-1]->off +
2098                                            aa->aa_ppga[page_count-1]->count - 1,
2099                                            client_cksum, server_cksum,
2100                                            cksum_type);
2101                         cksum_counter = 0;
2102                         aa->aa_oa->o_cksum = client_cksum;
2103                         rc = -EAGAIN;
2104                 } else {
2105                         cksum_counter++;
2106                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2107                         rc = 0;
2108                 }
2109         } else if (unlikely(client_cksum)) {
2110                 static int cksum_missed;
2111
2112                 cksum_missed++;
2113                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2114                         CERROR("%s: checksum %u requested from %s but not sent\n",
2115                                obd_name, cksum_missed,
2116                                libcfs_nid2str(peer->nid));
2117         } else {
2118                 rc = 0;
2119         }
2120
2121         inode = page2inode(aa->aa_ppga[0]->pg);
2122         if (inode == NULL) {
2123                 /* Try to get reference to inode from cl_page if we are
2124                  * dealing with direct IO, as handled pages are not
2125                  * actual page cache pages.
2126                  */
2127                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2128
2129                 inode = oap2cl_page(oap)->cp_inode;
2130                 if (inode) {
2131                         blockbits = inode->i_blkbits;
2132                         blocksize = 1 << blockbits;
2133                 }
2134         }
2135         if (inode && IS_ENCRYPTED(inode)) {
2136                 int idx;
2137
2138                 if (!llcrypt_has_encryption_key(inode)) {
2139                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2140                         GOTO(out, rc);
2141                 }
2142                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2143                         struct brw_page *pg = aa->aa_ppga[idx];
2144                         unsigned int offs = 0;
2145
2146                         while (offs < PAGE_SIZE) {
2147                                 /* do not decrypt if page is all 0s */
2148                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2149                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2150                                         /* if page is empty forward info to
2151                                          * upper layers (ll_io_zero_page) by
2152                                          * clearing PagePrivate2
2153                                          */
2154                                         if (!offs)
2155                                                 ClearPagePrivate2(pg->pg);
2156                                         break;
2157                                 }
2158
2159                                 if (blockbits) {
2160                                         /* This is direct IO case. Directly call
2161                                          * decrypt function that takes inode as
2162                                          * input parameter. Page does not need
2163                                          * to be locked.
2164                                          */
2165                                         u64 lblk_num =
2166                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2167                                                      (PAGE_SHIFT - blockbits)) +
2168                                                        (offs >> blockbits);
2169                                         unsigned int i;
2170
2171                                         for (i = offs;
2172                                              i < offs +
2173                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2174                                              i += blocksize, lblk_num++) {
2175                                                 rc =
2176                                                   llcrypt_decrypt_block_inplace(
2177                                                           inode, pg->pg,
2178                                                           blocksize, i,
2179                                                           lblk_num);
2180                                                 if (rc)
2181                                                         break;
2182                                         }
2183                                 } else {
2184                                         rc = llcrypt_decrypt_pagecache_blocks(
2185                                                 pg->pg,
2186                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2187                                                 offs);
2188                                 }
2189                                 if (rc)
2190                                         GOTO(out, rc);
2191
2192                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2193                         }
2194                 }
2195         }
2196
2197 out:
2198         if (rc >= 0)
2199                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2200                                      aa->aa_oa, &body->oa);
2201
2202         RETURN(rc);
2203 }
2204
2205 static int osc_brw_redo_request(struct ptlrpc_request *request,
2206                                 struct osc_brw_async_args *aa, int rc)
2207 {
2208         struct ptlrpc_request *new_req;
2209         struct osc_brw_async_args *new_aa;
2210         struct osc_async_page *oap;
2211         ENTRY;
2212
2213         /* The below message is checked in replay-ost-single.sh test_8ae*/
2214         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2215                   "redo for recoverable error %d", rc);
2216
2217         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2218                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2219                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2220                                   aa->aa_ppga, &new_req, 1);
2221         if (rc)
2222                 RETURN(rc);
2223
2224         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2225                 if (oap->oap_request != NULL) {
2226                         LASSERTF(request == oap->oap_request,
2227                                  "request %p != oap_request %p\n",
2228                                  request, oap->oap_request);
2229                 }
2230         }
2231         /*
2232          * New request takes over pga and oaps from old request.
2233          * Note that copying a list_head doesn't work, need to move it...
2234          */
2235         aa->aa_resends++;
2236         new_req->rq_interpret_reply = request->rq_interpret_reply;
2237         new_req->rq_async_args = request->rq_async_args;
2238         new_req->rq_commit_cb = request->rq_commit_cb;
2239         /* cap resend delay to the current request timeout, this is similar to
2240          * what ptlrpc does (see after_reply()) */
2241         if (aa->aa_resends > new_req->rq_timeout)
2242                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2243         else
2244                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2245         new_req->rq_generation_set = 1;
2246         new_req->rq_import_generation = request->rq_import_generation;
2247
2248         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2249
2250         INIT_LIST_HEAD(&new_aa->aa_oaps);
2251         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2252         INIT_LIST_HEAD(&new_aa->aa_exts);
2253         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2254         new_aa->aa_resends = aa->aa_resends;
2255
2256         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2257                 if (oap->oap_request) {
2258                         ptlrpc_req_finished(oap->oap_request);
2259                         oap->oap_request = ptlrpc_request_addref(new_req);
2260                 }
2261         }
2262
2263         /* XXX: This code will run into problem if we're going to support
2264          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2265          * and wait for all of them to be finished. We should inherit request
2266          * set from old request. */
2267         ptlrpcd_add_req(new_req);
2268
2269         DEBUG_REQ(D_INFO, new_req, "new request");
2270         RETURN(0);
2271 }
2272
2273 /*
2274  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2275  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2276  * fine for our small page arrays and doesn't require allocation.  its an
2277  * insertion sort that swaps elements that are strides apart, shrinking the
2278  * stride down until its '1' and the array is sorted.
2279  */
2280 static void sort_brw_pages(struct brw_page **array, int num)
2281 {
2282         int stride, i, j;
2283         struct brw_page *tmp;
2284
2285         if (num == 1)
2286                 return;
2287         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2288                 ;
2289
2290         do {
2291                 stride /= 3;
2292                 for (i = stride ; i < num ; i++) {
2293                         tmp = array[i];
2294                         j = i;
2295                         while (j >= stride && array[j - stride]->off > tmp->off) {
2296                                 array[j] = array[j - stride];
2297                                 j -= stride;
2298                         }
2299                         array[j] = tmp;
2300                 }
2301         } while (stride > 1);
2302 }
2303
2304 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2305 {
2306         LASSERT(ppga != NULL);
2307         OBD_FREE_PTR_ARRAY(ppga, count);
2308 }
2309
2310 static int brw_interpret(const struct lu_env *env,
2311                          struct ptlrpc_request *req, void *args, int rc)
2312 {
2313         struct osc_brw_async_args *aa = args;
2314         struct osc_extent *ext;
2315         struct osc_extent *tmp;
2316         struct client_obd *cli = aa->aa_cli;
2317         unsigned long transferred = 0;
2318
2319         ENTRY;
2320
2321         rc = osc_brw_fini_request(req, rc);
2322         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2323
2324         /* restore clear text pages */
2325         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2326
2327         /*
2328          * When server returns -EINPROGRESS, client should always retry
2329          * regardless of the number of times the bulk was resent already.
2330          */
2331         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2332                 if (req->rq_import_generation !=
2333                     req->rq_import->imp_generation) {
2334                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2335                                ""DOSTID", rc = %d.\n",
2336                                req->rq_import->imp_obd->obd_name,
2337                                POSTID(&aa->aa_oa->o_oi), rc);
2338                 } else if (rc == -EINPROGRESS ||
2339                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2340                         rc = osc_brw_redo_request(req, aa, rc);
2341                 } else {
2342                         CERROR("%s: too many resent retries for object: "
2343                                "%llu:%llu, rc = %d.\n",
2344                                req->rq_import->imp_obd->obd_name,
2345                                POSTID(&aa->aa_oa->o_oi), rc);
2346                 }
2347
2348                 if (rc == 0)
2349                         RETURN(0);
2350                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2351                         rc = -EIO;
2352         }
2353
2354         if (rc == 0) {
2355                 struct obdo *oa = aa->aa_oa;
2356                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2357                 unsigned long valid = 0;
2358                 struct cl_object *obj;
2359                 struct osc_async_page *last;
2360
2361                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2362                 obj = osc2cl(last->oap_obj);
2363
2364                 cl_object_attr_lock(obj);
2365                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2366                         attr->cat_blocks = oa->o_blocks;
2367                         valid |= CAT_BLOCKS;
2368                 }
2369                 if (oa->o_valid & OBD_MD_FLMTIME) {
2370                         attr->cat_mtime = oa->o_mtime;
2371                         valid |= CAT_MTIME;
2372                 }
2373                 if (oa->o_valid & OBD_MD_FLATIME) {
2374                         attr->cat_atime = oa->o_atime;
2375                         valid |= CAT_ATIME;
2376                 }
2377                 if (oa->o_valid & OBD_MD_FLCTIME) {
2378                         attr->cat_ctime = oa->o_ctime;
2379                         valid |= CAT_CTIME;
2380                 }
2381
2382                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2383                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2384                         loff_t last_off = last->oap_count + last->oap_obj_off +
2385                                 last->oap_page_off;
2386
2387                         /* Change file size if this is an out of quota or
2388                          * direct IO write and it extends the file size */
2389                         if (loi->loi_lvb.lvb_size < last_off) {
2390                                 attr->cat_size = last_off;
2391                                 valid |= CAT_SIZE;
2392                         }
2393                         /* Extend KMS if it's not a lockless write */
2394                         if (loi->loi_kms < last_off &&
2395                             oap2osc_page(last)->ops_srvlock == 0) {
2396                                 attr->cat_kms = last_off;
2397                                 valid |= CAT_KMS;
2398                         }
2399                 }
2400
2401                 if (valid != 0)
2402                         cl_object_attr_update(env, obj, attr, valid);
2403                 cl_object_attr_unlock(obj);
2404         }
2405         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2406         aa->aa_oa = NULL;
2407
2408         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2409                 osc_inc_unstable_pages(req);
2410
2411         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2412                 list_del_init(&ext->oe_link);
2413                 osc_extent_finish(env, ext, 1,
2414                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2415         }
2416         LASSERT(list_empty(&aa->aa_exts));
2417         LASSERT(list_empty(&aa->aa_oaps));
2418
2419         transferred = (req->rq_bulk == NULL ? /* short io */
2420                        aa->aa_requested_nob :
2421                        req->rq_bulk->bd_nob_transferred);
2422
2423         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2424         ptlrpc_lprocfs_brw(req, transferred);
2425
2426         spin_lock(&cli->cl_loi_list_lock);
2427         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2428          * is called so we know whether to go to sync BRWs or wait for more
2429          * RPCs to complete */
2430         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2431                 cli->cl_w_in_flight--;
2432         else
2433                 cli->cl_r_in_flight--;
2434         osc_wake_cache_waiters(cli);
2435         spin_unlock(&cli->cl_loi_list_lock);
2436
2437         osc_io_unplug(env, cli, NULL);
2438         RETURN(rc);
2439 }
2440
2441 static void brw_commit(struct ptlrpc_request *req)
2442 {
2443         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2444          * this called via the rq_commit_cb, I need to ensure
2445          * osc_dec_unstable_pages is still called. Otherwise unstable
2446          * pages may be leaked. */
2447         spin_lock(&req->rq_lock);
2448         if (likely(req->rq_unstable)) {
2449                 req->rq_unstable = 0;
2450                 spin_unlock(&req->rq_lock);
2451
2452                 osc_dec_unstable_pages(req);
2453         } else {
2454                 req->rq_committed = 1;
2455                 spin_unlock(&req->rq_lock);
2456         }
2457 }
2458
2459 /**
2460  * Build an RPC by the list of extent @ext_list. The caller must ensure
2461  * that the total pages in this list are NOT over max pages per RPC.
2462  * Extents in the list must be in OES_RPC state.
2463  */
2464 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2465                   struct list_head *ext_list, int cmd)
2466 {
2467         struct ptlrpc_request           *req = NULL;
2468         struct osc_extent               *ext;
2469         struct brw_page                 **pga = NULL;
2470         struct osc_brw_async_args       *aa = NULL;
2471         struct obdo                     *oa = NULL;
2472         struct osc_async_page           *oap;
2473         struct osc_object               *obj = NULL;
2474         struct cl_req_attr              *crattr = NULL;
2475         loff_t                          starting_offset = OBD_OBJECT_EOF;
2476         loff_t                          ending_offset = 0;
2477         /* '1' for consistency with code that checks !mpflag to restore */
2478         int mpflag = 1;
2479         int                             mem_tight = 0;
2480         int                             page_count = 0;
2481         bool                            soft_sync = false;
2482         bool                            ndelay = false;
2483         int                             i;
2484         int                             grant = 0;
2485         int                             rc;
2486         __u32                           layout_version = 0;
2487         LIST_HEAD(rpc_list);
2488         struct ost_body                 *body;
2489         ENTRY;
2490         LASSERT(!list_empty(ext_list));
2491
2492         /* add pages into rpc_list to build BRW rpc */
2493         list_for_each_entry(ext, ext_list, oe_link) {
2494                 LASSERT(ext->oe_state == OES_RPC);
2495                 mem_tight |= ext->oe_memalloc;
2496                 grant += ext->oe_grants;
2497                 page_count += ext->oe_nr_pages;
2498                 layout_version = max(layout_version, ext->oe_layout_version);
2499                 if (obj == NULL)
2500                         obj = ext->oe_obj;
2501         }
2502
2503         soft_sync = osc_over_unstable_soft_limit(cli);
2504         if (mem_tight)
2505                 mpflag = memalloc_noreclaim_save();
2506
2507         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2508         if (pga == NULL)
2509                 GOTO(out, rc = -ENOMEM);
2510
2511         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2512         if (oa == NULL)
2513                 GOTO(out, rc = -ENOMEM);
2514
2515         i = 0;
2516         list_for_each_entry(ext, ext_list, oe_link) {
2517                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2518                         if (mem_tight)
2519                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2520                         if (soft_sync)
2521                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2522                         pga[i] = &oap->oap_brw_page;
2523                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2524                         i++;
2525
2526                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2527                         if (starting_offset == OBD_OBJECT_EOF ||
2528                             starting_offset > oap->oap_obj_off)
2529                                 starting_offset = oap->oap_obj_off;
2530                         else
2531                                 LASSERT(oap->oap_page_off == 0);
2532                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2533                                 ending_offset = oap->oap_obj_off +
2534                                                 oap->oap_count;
2535                         else
2536                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2537                                         PAGE_SIZE);
2538                 }
2539                 if (ext->oe_ndelay)
2540                         ndelay = true;
2541         }
2542
2543         /* first page in the list */
2544         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2545
2546         crattr = &osc_env_info(env)->oti_req_attr;
2547         memset(crattr, 0, sizeof(*crattr));
2548         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2549         crattr->cra_flags = ~0ULL;
2550         crattr->cra_page = oap2cl_page(oap);
2551         crattr->cra_oa = oa;
2552         cl_req_attr_set(env, osc2cl(obj), crattr);
2553
2554         if (cmd == OBD_BRW_WRITE) {
2555                 oa->o_grant_used = grant;
2556                 if (layout_version > 0) {
2557                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2558                                PFID(&oa->o_oi.oi_fid), layout_version);
2559
2560                         oa->o_layout_version = layout_version;
2561                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2562                 }
2563         }
2564
2565         sort_brw_pages(pga, page_count);
2566         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2567         if (rc != 0) {
2568                 CERROR("prep_req failed: %d\n", rc);
2569                 GOTO(out, rc);
2570         }
2571
2572         req->rq_commit_cb = brw_commit;
2573         req->rq_interpret_reply = brw_interpret;
2574         req->rq_memalloc = mem_tight != 0;
2575         oap->oap_request = ptlrpc_request_addref(req);
2576         if (ndelay) {
2577                 req->rq_no_resend = req->rq_no_delay = 1;
2578                 /* probably set a shorter timeout value.
2579                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2580                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2581         }
2582
2583         /* Need to update the timestamps after the request is built in case
2584          * we race with setattr (locally or in queue at OST).  If OST gets
2585          * later setattr before earlier BRW (as determined by the request xid),
2586          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2587          * way to do this in a single call.  bug 10150 */
2588         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2589         crattr->cra_oa = &body->oa;
2590         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2591         cl_req_attr_set(env, osc2cl(obj), crattr);
2592         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2593
2594         aa = ptlrpc_req_async_args(aa, req);
2595         INIT_LIST_HEAD(&aa->aa_oaps);
2596         list_splice_init(&rpc_list, &aa->aa_oaps);
2597         INIT_LIST_HEAD(&aa->aa_exts);
2598         list_splice_init(ext_list, &aa->aa_exts);
2599
2600         spin_lock(&cli->cl_loi_list_lock);
2601         starting_offset >>= PAGE_SHIFT;
2602         if (cmd == OBD_BRW_READ) {
2603                 cli->cl_r_in_flight++;
2604                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2605                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2606                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2607                                       starting_offset + 1);
2608         } else {
2609                 cli->cl_w_in_flight++;
2610                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2611                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2612                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2613                                       starting_offset + 1);
2614         }
2615         spin_unlock(&cli->cl_loi_list_lock);
2616
2617         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2618                   page_count, aa, cli->cl_r_in_flight,
2619                   cli->cl_w_in_flight);
2620         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2621
2622         ptlrpcd_add_req(req);
2623         rc = 0;
2624         EXIT;
2625
2626 out:
2627         if (mem_tight)
2628                 memalloc_noreclaim_restore(mpflag);
2629
2630         if (rc != 0) {
2631                 LASSERT(req == NULL);
2632
2633                 if (oa)
2634                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2635                 if (pga) {
2636                         osc_release_bounce_pages(pga, page_count);
2637                         osc_release_ppga(pga, page_count);
2638                 }
2639                 /* this should happen rarely and is pretty bad, it makes the
2640                  * pending list not follow the dirty order */
2641                 while (!list_empty(ext_list)) {
2642                         ext = list_entry(ext_list->next, struct osc_extent,
2643                                          oe_link);
2644                         list_del_init(&ext->oe_link);
2645                         osc_extent_finish(env, ext, 0, rc);
2646                 }
2647         }
2648         RETURN(rc);
2649 }
2650
2651 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2652 {
2653         int set = 0;
2654
2655         LASSERT(lock != NULL);
2656
2657         lock_res_and_lock(lock);
2658
2659         if (lock->l_ast_data == NULL)
2660                 lock->l_ast_data = data;
2661         if (lock->l_ast_data == data)
2662                 set = 1;
2663
2664         unlock_res_and_lock(lock);
2665
2666         return set;
2667 }
2668
2669 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2670                      void *cookie, struct lustre_handle *lockh,
2671                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2672                      int errcode)
2673 {
2674         bool intent = *flags & LDLM_FL_HAS_INTENT;
2675         int rc;
2676         ENTRY;
2677
2678         /* The request was created before ldlm_cli_enqueue call. */
2679         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2680                 struct ldlm_reply *rep;
2681
2682                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2683                 LASSERT(rep != NULL);
2684
2685                 rep->lock_policy_res1 =
2686                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2687                 if (rep->lock_policy_res1)
2688                         errcode = rep->lock_policy_res1;
2689                 if (!speculative)
2690                         *flags |= LDLM_FL_LVB_READY;
2691         } else if (errcode == ELDLM_OK) {
2692                 *flags |= LDLM_FL_LVB_READY;
2693         }
2694
2695         /* Call the update callback. */
2696         rc = (*upcall)(cookie, lockh, errcode);
2697
2698         /* release the reference taken in ldlm_cli_enqueue() */
2699         if (errcode == ELDLM_LOCK_MATCHED)
2700                 errcode = ELDLM_OK;
2701         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2702                 ldlm_lock_decref(lockh, mode);
2703
2704         RETURN(rc);
2705 }
2706
2707 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2708                           void *args, int rc)
2709 {
2710         struct osc_enqueue_args *aa = args;
2711         struct ldlm_lock *lock;
2712         struct lustre_handle *lockh = &aa->oa_lockh;
2713         enum ldlm_mode mode = aa->oa_mode;
2714         struct ost_lvb *lvb = aa->oa_lvb;
2715         __u32 lvb_len = sizeof(*lvb);
2716         __u64 flags = 0;
2717         struct ldlm_enqueue_info einfo = {
2718                 .ei_type = aa->oa_type,
2719                 .ei_mode = mode,
2720         };
2721
2722         ENTRY;
2723
2724         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2725          * be valid. */
2726         lock = ldlm_handle2lock(lockh);
2727         LASSERTF(lock != NULL,
2728                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2729                  lockh->cookie, req, aa);
2730
2731         /* Take an additional reference so that a blocking AST that
2732          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2733          * to arrive after an upcall has been executed by
2734          * osc_enqueue_fini(). */
2735         ldlm_lock_addref(lockh, mode);
2736
2737         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2738         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2739
2740         /* Let CP AST to grant the lock first. */
2741         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2742
2743         if (aa->oa_speculative) {
2744                 LASSERT(aa->oa_lvb == NULL);
2745                 LASSERT(aa->oa_flags == NULL);
2746                 aa->oa_flags = &flags;
2747         }
2748
2749         /* Complete obtaining the lock procedure. */
2750         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2751                                    lvb, lvb_len, lockh, rc);
2752         /* Complete osc stuff. */
2753         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2754                               aa->oa_flags, aa->oa_speculative, rc);
2755
2756         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2757
2758         ldlm_lock_decref(lockh, mode);
2759         LDLM_LOCK_PUT(lock);
2760         RETURN(rc);
2761 }
2762
2763 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2764  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2765  * other synchronous requests, however keeping some locks and trying to obtain
2766  * others may take a considerable amount of time in a case of ost failure; and
2767  * when other sync requests do not get released lock from a client, the client
2768  * is evicted from the cluster -- such scenarious make the life difficult, so
2769  * release locks just after they are obtained. */
2770 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2771                      __u64 *flags, union ldlm_policy_data *policy,
2772                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2773                      void *cookie, struct ldlm_enqueue_info *einfo,
2774                      struct ptlrpc_request_set *rqset, int async,
2775                      bool speculative)
2776 {
2777         struct obd_device *obd = exp->exp_obd;
2778         struct lustre_handle lockh = { 0 };
2779         struct ptlrpc_request *req = NULL;
2780         int intent = *flags & LDLM_FL_HAS_INTENT;
2781         __u64 match_flags = *flags;
2782         enum ldlm_mode mode;
2783         int rc;
2784         ENTRY;
2785
2786         /* Filesystem lock extents are extended to page boundaries so that
2787          * dealing with the page cache is a little smoother.  */
2788         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2789         policy->l_extent.end |= ~PAGE_MASK;
2790
2791         /* Next, search for already existing extent locks that will cover us */
2792         /* If we're trying to read, we also search for an existing PW lock.  The
2793          * VFS and page cache already protect us locally, so lots of readers/
2794          * writers can share a single PW lock.
2795          *
2796          * There are problems with conversion deadlocks, so instead of
2797          * converting a read lock to a write lock, we'll just enqueue a new
2798          * one.
2799          *
2800          * At some point we should cancel the read lock instead of making them
2801          * send us a blocking callback, but there are problems with canceling
2802          * locks out from other users right now, too. */
2803         mode = einfo->ei_mode;
2804         if (einfo->ei_mode == LCK_PR)
2805                 mode |= LCK_PW;
2806         /* Normal lock requests must wait for the LVB to be ready before
2807          * matching a lock; speculative lock requests do not need to,
2808          * because they will not actually use the lock. */
2809         if (!speculative)
2810                 match_flags |= LDLM_FL_LVB_READY;
2811         if (intent != 0)
2812                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2813         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2814                                einfo->ei_type, policy, mode, &lockh);
2815         if (mode) {
2816                 struct ldlm_lock *matched;
2817
2818                 if (*flags & LDLM_FL_TEST_LOCK)
2819                         RETURN(ELDLM_OK);
2820
2821                 matched = ldlm_handle2lock(&lockh);
2822                 if (speculative) {
2823                         /* This DLM lock request is speculative, and does not
2824                          * have an associated IO request. Therefore if there
2825                          * is already a DLM lock, it wll just inform the
2826                          * caller to cancel the request for this stripe.*/
2827                         lock_res_and_lock(matched);
2828                         if (ldlm_extent_equal(&policy->l_extent,
2829                             &matched->l_policy_data.l_extent))
2830                                 rc = -EEXIST;
2831                         else
2832                                 rc = -ECANCELED;
2833                         unlock_res_and_lock(matched);
2834
2835                         ldlm_lock_decref(&lockh, mode);
2836                         LDLM_LOCK_PUT(matched);
2837                         RETURN(rc);
2838                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2839                         *flags |= LDLM_FL_LVB_READY;
2840
2841                         /* We already have a lock, and it's referenced. */
2842                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2843
2844                         ldlm_lock_decref(&lockh, mode);
2845                         LDLM_LOCK_PUT(matched);
2846                         RETURN(ELDLM_OK);
2847                 } else {
2848                         ldlm_lock_decref(&lockh, mode);
2849                         LDLM_LOCK_PUT(matched);
2850                 }
2851         }
2852
2853         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2854                 RETURN(-ENOLCK);
2855
2856         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2857         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2858
2859         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2860                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2861         if (async) {
2862                 if (!rc) {
2863                         struct osc_enqueue_args *aa;
2864                         aa = ptlrpc_req_async_args(aa, req);
2865                         aa->oa_exp         = exp;
2866                         aa->oa_mode        = einfo->ei_mode;
2867                         aa->oa_type        = einfo->ei_type;
2868                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2869                         aa->oa_upcall      = upcall;
2870                         aa->oa_cookie      = cookie;
2871                         aa->oa_speculative = speculative;
2872                         if (!speculative) {
2873                                 aa->oa_flags  = flags;
2874                                 aa->oa_lvb    = lvb;
2875                         } else {
2876                                 /* speculative locks are essentially to enqueue
2877                                  * a DLM lock  in advance, so we don't care
2878                                  * about the result of the enqueue. */
2879                                 aa->oa_lvb    = NULL;
2880                                 aa->oa_flags  = NULL;
2881                         }
2882
2883                         req->rq_interpret_reply = osc_enqueue_interpret;
2884                         ptlrpc_set_add_req(rqset, req);
2885                 }
2886                 RETURN(rc);
2887         }
2888
2889         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2890                               flags, speculative, rc);
2891
2892         RETURN(rc);
2893 }
2894
2895 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2896                    struct ldlm_res_id *res_id, enum ldlm_type type,
2897                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2898                    __u64 *flags, struct osc_object *obj,
2899                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2900 {
2901         struct obd_device *obd = exp->exp_obd;
2902         __u64 lflags = *flags;
2903         enum ldlm_mode rc;
2904         ENTRY;
2905
2906         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2907                 RETURN(-EIO);
2908
2909         /* Filesystem lock extents are extended to page boundaries so that
2910          * dealing with the page cache is a little smoother */
2911         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2912         policy->l_extent.end |= ~PAGE_MASK;
2913
2914         /* Next, search for already existing extent locks that will cover us */
2915         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2916                                         res_id, type, policy, mode, lockh,
2917                                         match_flags);
2918         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2919                 RETURN(rc);
2920
2921         if (obj != NULL) {
2922                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2923
2924                 LASSERT(lock != NULL);
2925                 if (osc_set_lock_data(lock, obj)) {
2926                         lock_res_and_lock(lock);
2927                         if (!ldlm_is_lvb_cached(lock)) {
2928                                 LASSERT(lock->l_ast_data == obj);
2929                                 osc_lock_lvb_update(env, obj, lock, NULL);
2930                                 ldlm_set_lvb_cached(lock);
2931                         }
2932                         unlock_res_and_lock(lock);
2933                 } else {
2934                         ldlm_lock_decref(lockh, rc);
2935                         rc = 0;
2936                 }
2937                 LDLM_LOCK_PUT(lock);
2938         }
2939         RETURN(rc);
2940 }
2941
2942 static int osc_statfs_interpret(const struct lu_env *env,
2943                                 struct ptlrpc_request *req, void *args, int rc)
2944 {
2945         struct osc_async_args *aa = args;
2946         struct obd_statfs *msfs;
2947
2948         ENTRY;
2949         if (rc == -EBADR)
2950                 /*
2951                  * The request has in fact never been sent due to issues at
2952                  * a higher level (LOV).  Exit immediately since the caller
2953                  * is aware of the problem and takes care of the clean up.
2954                  */
2955                 RETURN(rc);
2956
2957         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2958             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2959                 GOTO(out, rc = 0);
2960
2961         if (rc != 0)
2962                 GOTO(out, rc);
2963
2964         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2965         if (msfs == NULL)
2966                 GOTO(out, rc = -EPROTO);
2967
2968         *aa->aa_oi->oi_osfs = *msfs;
2969 out:
2970         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2971
2972         RETURN(rc);
2973 }
2974
2975 static int osc_statfs_async(struct obd_export *exp,
2976                             struct obd_info *oinfo, time64_t max_age,
2977                             struct ptlrpc_request_set *rqset)
2978 {
2979         struct obd_device     *obd = class_exp2obd(exp);
2980         struct ptlrpc_request *req;
2981         struct osc_async_args *aa;
2982         int rc;
2983         ENTRY;
2984
2985         if (obd->obd_osfs_age >= max_age) {
2986                 CDEBUG(D_SUPER,
2987                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2988                        obd->obd_name, &obd->obd_osfs,
2989                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2990                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2991                 spin_lock(&obd->obd_osfs_lock);
2992                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2993                 spin_unlock(&obd->obd_osfs_lock);
2994                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2995                 if (oinfo->oi_cb_up)
2996                         oinfo->oi_cb_up(oinfo, 0);
2997
2998                 RETURN(0);
2999         }
3000
3001         /* We could possibly pass max_age in the request (as an absolute
3002          * timestamp or a "seconds.usec ago") so the target can avoid doing
3003          * extra calls into the filesystem if that isn't necessary (e.g.
3004          * during mount that would help a bit).  Having relative timestamps
3005          * is not so great if request processing is slow, while absolute
3006          * timestamps are not ideal because they need time synchronization. */
3007         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3008         if (req == NULL)
3009                 RETURN(-ENOMEM);
3010
3011         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3012         if (rc) {
3013                 ptlrpc_request_free(req);
3014                 RETURN(rc);
3015         }
3016         ptlrpc_request_set_replen(req);
3017         req->rq_request_portal = OST_CREATE_PORTAL;
3018         ptlrpc_at_set_req_timeout(req);
3019
3020         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3021                 /* procfs requests not want stat in wait for avoid deadlock */
3022                 req->rq_no_resend = 1;
3023                 req->rq_no_delay = 1;
3024         }
3025
3026         req->rq_interpret_reply = osc_statfs_interpret;
3027         aa = ptlrpc_req_async_args(aa, req);
3028         aa->aa_oi = oinfo;
3029
3030         ptlrpc_set_add_req(rqset, req);
3031         RETURN(0);
3032 }
3033
3034 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3035                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3036 {
3037         struct obd_device     *obd = class_exp2obd(exp);
3038         struct obd_statfs     *msfs;
3039         struct ptlrpc_request *req;
3040         struct obd_import     *imp = NULL;
3041         int rc;
3042         ENTRY;
3043
3044
3045         /*Since the request might also come from lprocfs, so we need
3046          *sync this with client_disconnect_export Bug15684*/
3047         down_read(&obd->u.cli.cl_sem);
3048         if (obd->u.cli.cl_import)
3049                 imp = class_import_get(obd->u.cli.cl_import);
3050         up_read(&obd->u.cli.cl_sem);
3051         if (!imp)
3052                 RETURN(-ENODEV);
3053
3054         /* We could possibly pass max_age in the request (as an absolute
3055          * timestamp or a "seconds.usec ago") so the target can avoid doing
3056          * extra calls into the filesystem if that isn't necessary (e.g.
3057          * during mount that would help a bit).  Having relative timestamps
3058          * is not so great if request processing is slow, while absolute
3059          * timestamps are not ideal because they need time synchronization. */
3060         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3061
3062         class_import_put(imp);
3063
3064         if (req == NULL)
3065                 RETURN(-ENOMEM);
3066
3067         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3068         if (rc) {
3069                 ptlrpc_request_free(req);
3070                 RETURN(rc);
3071         }
3072         ptlrpc_request_set_replen(req);
3073         req->rq_request_portal = OST_CREATE_PORTAL;
3074         ptlrpc_at_set_req_timeout(req);
3075
3076         if (flags & OBD_STATFS_NODELAY) {
3077                 /* procfs requests not want stat in wait for avoid deadlock */
3078                 req->rq_no_resend = 1;
3079                 req->rq_no_delay = 1;
3080         }
3081
3082         rc = ptlrpc_queue_wait(req);
3083         if (rc)
3084                 GOTO(out, rc);
3085
3086         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3087         if (msfs == NULL)
3088                 GOTO(out, rc = -EPROTO);
3089
3090         *osfs = *msfs;
3091
3092         EXIT;
3093 out:
3094         ptlrpc_req_finished(req);
3095         return rc;
3096 }
3097
3098 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3099                          void *karg, void __user *uarg)
3100 {
3101         struct obd_device *obd = exp->exp_obd;
3102         struct obd_ioctl_data *data = karg;
3103         int rc = 0;
3104
3105         ENTRY;
3106         if (!try_module_get(THIS_MODULE)) {
3107                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3108                        module_name(THIS_MODULE));
3109                 return -EINVAL;
3110         }
3111         switch (cmd) {
3112         case OBD_IOC_CLIENT_RECOVER:
3113                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3114                                            data->ioc_inlbuf1, 0);
3115                 if (rc > 0)
3116                         rc = 0;
3117                 break;
3118         case IOC_OSC_SET_ACTIVE:
3119                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3120                                               data->ioc_offset);
3121                 break;
3122         default:
3123                 rc = -ENOTTY;
3124                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3125                        obd->obd_name, cmd, current->comm, rc);
3126                 break;
3127         }
3128
3129         module_put(THIS_MODULE);
3130         return rc;
3131 }
3132
3133 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3134                        u32 keylen, void *key, u32 vallen, void *val,
3135                        struct ptlrpc_request_set *set)
3136 {
3137         struct ptlrpc_request *req;
3138         struct obd_device     *obd = exp->exp_obd;
3139         struct obd_import     *imp = class_exp2cliimp(exp);
3140         char                  *tmp;
3141         int                    rc;
3142         ENTRY;
3143
3144         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3145
3146         if (KEY_IS(KEY_CHECKSUM)) {
3147                 if (vallen != sizeof(int))
3148                         RETURN(-EINVAL);
3149                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3150                 RETURN(0);
3151         }
3152
3153         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3154                 sptlrpc_conf_client_adapt(obd);
3155                 RETURN(0);
3156         }
3157
3158         if (KEY_IS(KEY_FLUSH_CTX)) {
3159                 sptlrpc_import_flush_my_ctx(imp);
3160                 RETURN(0);
3161         }
3162
3163         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3164                 struct client_obd *cli = &obd->u.cli;
3165                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3166                 long target = *(long *)val;
3167
3168                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3169                 *(long *)val -= nr;
3170                 RETURN(0);
3171         }
3172
3173         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3174                 RETURN(-EINVAL);
3175
3176         /* We pass all other commands directly to OST. Since nobody calls osc
3177            methods directly and everybody is supposed to go through LOV, we
3178            assume lov checked invalid values for us.
3179            The only recognised values so far are evict_by_nid and mds_conn.
3180            Even if something bad goes through, we'd get a -EINVAL from OST
3181            anyway. */
3182
3183         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3184                                                 &RQF_OST_SET_GRANT_INFO :
3185                                                 &RQF_OBD_SET_INFO);
3186         if (req == NULL)
3187                 RETURN(-ENOMEM);
3188
3189         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3190                              RCL_CLIENT, keylen);
3191         if (!KEY_IS(KEY_GRANT_SHRINK))
3192                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3193                                      RCL_CLIENT, vallen);
3194         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3195         if (rc) {
3196                 ptlrpc_request_free(req);
3197                 RETURN(rc);
3198         }
3199
3200         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3201         memcpy(tmp, key, keylen);
3202         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3203                                                         &RMF_OST_BODY :
3204                                                         &RMF_SETINFO_VAL);
3205         memcpy(tmp, val, vallen);
3206
3207         if (KEY_IS(KEY_GRANT_SHRINK)) {
3208                 struct osc_grant_args *aa;
3209                 struct obdo *oa;
3210
3211                 aa = ptlrpc_req_async_args(aa, req);
3212                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3213                 if (!oa) {
3214                         ptlrpc_req_finished(req);
3215                         RETURN(-ENOMEM);
3216                 }
3217                 *oa = ((struct ost_body *)val)->oa;
3218                 aa->aa_oa = oa;
3219                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3220         }
3221
3222         ptlrpc_request_set_replen(req);
3223         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3224                 LASSERT(set != NULL);
3225                 ptlrpc_set_add_req(set, req);
3226                 ptlrpc_check_set(NULL, set);
3227         } else {
3228                 ptlrpcd_add_req(req);
3229         }
3230
3231         RETURN(0);
3232 }
3233 EXPORT_SYMBOL(osc_set_info_async);
3234
3235 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3236                   struct obd_device *obd, struct obd_uuid *cluuid,
3237                   struct obd_connect_data *data, void *localdata)
3238 {
3239         struct client_obd *cli = &obd->u.cli;
3240
3241         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3242                 long lost_grant;
3243                 long grant;
3244
3245                 spin_lock(&cli->cl_loi_list_lock);
3246                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3247                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3248                         /* restore ocd_grant_blkbits as client page bits */
3249                         data->ocd_grant_blkbits = PAGE_SHIFT;
3250                         grant += cli->cl_dirty_grant;
3251                 } else {
3252                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3253                 }
3254                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3255                 lost_grant = cli->cl_lost_grant;
3256                 cli->cl_lost_grant = 0;
3257                 spin_unlock(&cli->cl_loi_list_lock);
3258
3259                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3260                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3261                        data->ocd_version, data->ocd_grant, lost_grant);
3262         }
3263
3264         RETURN(0);
3265 }
3266 EXPORT_SYMBOL(osc_reconnect);
3267
3268 int osc_disconnect(struct obd_export *exp)
3269 {
3270         struct obd_device *obd = class_exp2obd(exp);
3271         int rc;
3272
3273         rc = client_disconnect_export(exp);
3274         /**
3275          * Initially we put del_shrink_grant before disconnect_export, but it
3276          * causes the following problem if setup (connect) and cleanup
3277          * (disconnect) are tangled together.
3278          *      connect p1                     disconnect p2
3279          *   ptlrpc_connect_import
3280          *     ...............               class_manual_cleanup
3281          *                                     osc_disconnect
3282          *                                     del_shrink_grant
3283          *   ptlrpc_connect_interrupt
3284          *     osc_init_grant
3285          *   add this client to shrink list
3286          *                                      cleanup_osc
3287          * Bang! grant shrink thread trigger the shrink. BUG18662
3288          */
3289         osc_del_grant_list(&obd->u.cli);
3290         return rc;
3291 }
3292 EXPORT_SYMBOL(osc_disconnect);
3293
3294 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3295                                  struct hlist_node *hnode, void *arg)
3296 {
3297         struct lu_env *env = arg;
3298         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3299         struct ldlm_lock *lock;
3300         struct osc_object *osc = NULL;
3301         ENTRY;
3302
3303         lock_res(res);
3304         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3305                 if (lock->l_ast_data != NULL && osc == NULL) {
3306                         osc = lock->l_ast_data;
3307                         cl_object_get(osc2cl(osc));
3308                 }
3309
3310                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3311                  * by the 2nd round of ldlm_namespace_clean() call in
3312                  * osc_import_event(). */
3313                 ldlm_clear_cleaned(lock);
3314         }
3315         unlock_res(res);
3316
3317         if (osc != NULL) {
3318                 osc_object_invalidate(env, osc);
3319                 cl_object_put(env, osc2cl(osc));
3320         }
3321
3322         RETURN(0);
3323 }
3324 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3325
3326 static int osc_import_event(struct obd_device *obd,
3327                             struct obd_import *imp,
3328                             enum obd_import_event event)
3329 {
3330         struct client_obd *cli;
3331         int rc = 0;
3332
3333         ENTRY;
3334         LASSERT(imp->imp_obd == obd);
3335
3336         switch (event) {
3337         case IMP_EVENT_DISCON: {
3338                 cli = &obd->u.cli;
3339                 spin_lock(&cli->cl_loi_list_lock);
3340                 cli->cl_avail_grant = 0;
3341                 cli->cl_lost_grant = 0;
3342                 spin_unlock(&cli->cl_loi_list_lock);
3343                 break;
3344         }
3345         case IMP_EVENT_INACTIVE: {
3346                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3347                 break;
3348         }
3349         case IMP_EVENT_INVALIDATE: {
3350                 struct ldlm_namespace *ns = obd->obd_namespace;
3351                 struct lu_env         *env;
3352                 __u16                  refcheck;
3353
3354                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3355
3356                 env = cl_env_get(&refcheck);
3357                 if (!IS_ERR(env)) {
3358                         osc_io_unplug(env, &obd->u.cli, NULL);
3359
3360                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3361                                                  osc_ldlm_resource_invalidate,
3362                                                  env, 0);
3363                         cl_env_put(env, &refcheck);
3364
3365                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3366                 } else
3367                         rc = PTR_ERR(env);
3368                 break;
3369         }
3370         case IMP_EVENT_ACTIVE: {
3371                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3372                 break;
3373         }
3374         case IMP_EVENT_OCD: {
3375                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3376
3377                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3378                         osc_init_grant(&obd->u.cli, ocd);
3379
3380                 /* See bug 7198 */
3381                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3382                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3383
3384                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3385                 break;
3386         }
3387         case IMP_EVENT_DEACTIVATE: {
3388                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3389                 break;
3390         }
3391         case IMP_EVENT_ACTIVATE: {
3392                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3393                 break;
3394         }
3395         default:
3396                 CERROR("Unknown import event %d\n", event);
3397                 LBUG();
3398         }
3399         RETURN(rc);
3400 }
3401
3402 /**
3403  * Determine whether the lock can be canceled before replaying the lock
3404  * during recovery, see bug16774 for detailed information.
3405  *
3406  * \retval zero the lock can't be canceled
3407  * \retval other ok to cancel
3408  */
3409 static int osc_cancel_weight(struct ldlm_lock *lock)
3410 {
3411         /*
3412          * Cancel all unused and granted extent lock.
3413          */
3414         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3415             ldlm_is_granted(lock) &&
3416             osc_ldlm_weigh_ast(lock) == 0)
3417                 RETURN(1);
3418
3419         RETURN(0);
3420 }
3421
3422 static int brw_queue_work(const struct lu_env *env, void *data)
3423 {
3424         struct client_obd *cli = data;
3425
3426         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3427
3428         osc_io_unplug(env, cli, NULL);
3429         RETURN(0);
3430 }
3431
3432 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3433 {
3434         struct client_obd *cli = &obd->u.cli;
3435         void *handler;
3436         int rc;
3437
3438         ENTRY;
3439
3440         rc = ptlrpcd_addref();
3441         if (rc)
3442                 RETURN(rc);
3443
3444         rc = client_obd_setup(obd, lcfg);
3445         if (rc)
3446                 GOTO(out_ptlrpcd, rc);
3447
3448
3449         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3450         if (IS_ERR(handler))
3451                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3452         cli->cl_writeback_work = handler;
3453
3454         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3455         if (IS_ERR(handler))
3456                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3457         cli->cl_lru_work = handler;
3458
3459         rc = osc_quota_setup(obd);
3460         if (rc)
3461                 GOTO(out_ptlrpcd_work, rc);
3462
3463         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3464         osc_update_next_shrink(cli);
3465
3466         RETURN(rc);
3467
3468 out_ptlrpcd_work:
3469         if (cli->cl_writeback_work != NULL) {
3470                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3471                 cli->cl_writeback_work = NULL;
3472         }
3473         if (cli->cl_lru_work != NULL) {
3474                 ptlrpcd_destroy_work(cli->cl_lru_work);
3475                 cli->cl_lru_work = NULL;
3476         }
3477         client_obd_cleanup(obd);
3478 out_ptlrpcd:
3479         ptlrpcd_decref();
3480         RETURN(rc);
3481 }
3482 EXPORT_SYMBOL(osc_setup_common);
3483
3484 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3485 {
3486         struct client_obd *cli = &obd->u.cli;
3487         int                adding;
3488         int                added;
3489         int                req_count;
3490         int                rc;
3491
3492         ENTRY;
3493
3494         rc = osc_setup_common(obd, lcfg);
3495         if (rc < 0)
3496                 RETURN(rc);
3497
3498         rc = osc_tunables_init(obd);
3499         if (rc)
3500                 RETURN(rc);
3501
3502         /*
3503          * We try to control the total number of requests with a upper limit
3504          * osc_reqpool_maxreqcount. There might be some race which will cause
3505          * over-limit allocation, but it is fine.
3506          */
3507         req_count = atomic_read(&osc_pool_req_count);
3508         if (req_count < osc_reqpool_maxreqcount) {
3509                 adding = cli->cl_max_rpcs_in_flight + 2;
3510                 if (req_count + adding > osc_reqpool_maxreqcount)
3511                         adding = osc_reqpool_maxreqcount - req_count;
3512
3513                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3514                 atomic_add(added, &osc_pool_req_count);
3515         }
3516
3517         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3518
3519         spin_lock(&osc_shrink_lock);
3520         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3521         spin_unlock(&osc_shrink_lock);
3522         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3523         cli->cl_import->imp_idle_debug = D_HA;
3524
3525         RETURN(0);
3526 }
3527
3528 int osc_precleanup_common(struct obd_device *obd)
3529 {
3530         struct client_obd *cli = &obd->u.cli;
3531         ENTRY;
3532
3533         /* LU-464
3534          * for echo client, export may be on zombie list, wait for
3535          * zombie thread to cull it, because cli.cl_import will be
3536          * cleared in client_disconnect_export():
3537          *   class_export_destroy() -> obd_cleanup() ->
3538          *   echo_device_free() -> echo_client_cleanup() ->
3539          *   obd_disconnect() -> osc_disconnect() ->
3540          *   client_disconnect_export()
3541          */
3542         obd_zombie_barrier();
3543         if (cli->cl_writeback_work) {
3544                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3545                 cli->cl_writeback_work = NULL;
3546         }
3547
3548         if (cli->cl_lru_work) {
3549                 ptlrpcd_destroy_work(cli->cl_lru_work);
3550                 cli->cl_lru_work = NULL;
3551         }
3552
3553         obd_cleanup_client_import(obd);
3554         RETURN(0);
3555 }
3556 EXPORT_SYMBOL(osc_precleanup_common);
3557
3558 static int osc_precleanup(struct obd_device *obd)
3559 {
3560         ENTRY;
3561
3562         osc_precleanup_common(obd);
3563
3564         ptlrpc_lprocfs_unregister_obd(obd);
3565         RETURN(0);
3566 }
3567
3568 int osc_cleanup_common(struct obd_device *obd)
3569 {
3570         struct client_obd *cli = &obd->u.cli;
3571         int rc;
3572
3573         ENTRY;
3574
3575         spin_lock(&osc_shrink_lock);
3576         list_del(&cli->cl_shrink_list);
3577         spin_unlock(&osc_shrink_lock);
3578
3579         /* lru cleanup */
3580         if (cli->cl_cache != NULL) {
3581                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3582                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3583                 list_del_init(&cli->cl_lru_osc);
3584                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3585                 cli->cl_lru_left = NULL;
3586                 cl_cache_decref(cli->cl_cache);
3587                 cli->cl_cache = NULL;
3588         }
3589
3590         /* free memory of osc quota cache */
3591         osc_quota_cleanup(obd);
3592
3593         rc = client_obd_cleanup(obd);
3594
3595         ptlrpcd_decref();
3596         RETURN(rc);
3597 }
3598 EXPORT_SYMBOL(osc_cleanup_common);
3599
3600 static const struct obd_ops osc_obd_ops = {
3601         .o_owner                = THIS_MODULE,
3602         .o_setup                = osc_setup,
3603         .o_precleanup           = osc_precleanup,
3604         .o_cleanup              = osc_cleanup_common,
3605         .o_add_conn             = client_import_add_conn,
3606         .o_del_conn             = client_import_del_conn,
3607         .o_connect              = client_connect_import,
3608         .o_reconnect            = osc_reconnect,
3609         .o_disconnect           = osc_disconnect,
3610         .o_statfs               = osc_statfs,
3611         .o_statfs_async         = osc_statfs_async,
3612         .o_create               = osc_create,
3613         .o_destroy              = osc_destroy,
3614         .o_getattr              = osc_getattr,
3615         .o_setattr              = osc_setattr,
3616         .o_iocontrol            = osc_iocontrol,
3617         .o_set_info_async       = osc_set_info_async,
3618         .o_import_event         = osc_import_event,
3619         .o_quotactl             = osc_quotactl,
3620 };
3621
3622 static struct shrinker *osc_cache_shrinker;
3623 LIST_HEAD(osc_shrink_list);
3624 DEFINE_SPINLOCK(osc_shrink_lock);
3625
3626 #ifndef HAVE_SHRINKER_COUNT
3627 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3628 {
3629         struct shrink_control scv = {
3630                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3631                 .gfp_mask   = shrink_param(sc, gfp_mask)
3632         };
3633         (void)osc_cache_shrink_scan(shrinker, &scv);
3634
3635         return osc_cache_shrink_count(shrinker, &scv);
3636 }
3637 #endif
3638
3639 static int __init osc_init(void)
3640 {
3641         unsigned int reqpool_size;
3642         unsigned int reqsize;
3643         int rc;
3644         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3645                          osc_cache_shrink_count, osc_cache_shrink_scan);
3646         ENTRY;
3647
3648         /* print an address of _any_ initialized kernel symbol from this
3649          * module, to allow debugging with gdb that doesn't support data
3650          * symbols from modules.*/
3651         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3652
3653         rc = lu_kmem_init(osc_caches);
3654         if (rc)
3655                 RETURN(rc);
3656
3657         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3658                                  LUSTRE_OSC_NAME, &osc_device_type);
3659         if (rc)
3660                 GOTO(out_kmem, rc);
3661
3662         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3663
3664         /* This is obviously too much memory, only prevent overflow here */
3665         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3666                 GOTO(out_type, rc = -EINVAL);
3667
3668         reqpool_size = osc_reqpool_mem_max << 20;
3669
3670         reqsize = 1;
3671         while (reqsize < OST_IO_MAXREQSIZE)
3672                 reqsize = reqsize << 1;
3673
3674         /*
3675          * We don't enlarge the request count in OSC pool according to
3676          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3677          * tried after normal allocation failed. So a small OSC pool won't
3678          * cause much performance degression in most of cases.
3679          */
3680         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3681
3682         atomic_set(&osc_pool_req_count, 0);
3683         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3684                                           ptlrpc_add_rqs_to_pool);
3685
3686         if (osc_rq_pool == NULL)
3687                 GOTO(out_type, rc = -ENOMEM);
3688
3689         rc = osc_start_grant_work();
3690         if (rc != 0)
3691                 GOTO(out_req_pool, rc);
3692
3693         RETURN(rc);
3694
3695 out_req_pool:
3696         ptlrpc_free_rq_pool(osc_rq_pool);
3697 out_type:
3698         class_unregister_type(LUSTRE_OSC_NAME);
3699 out_kmem:
3700         lu_kmem_fini(osc_caches);
3701
3702         RETURN(rc);
3703 }
3704
3705 static void __exit osc_exit(void)
3706 {
3707         osc_stop_grant_work();
3708         remove_shrinker(osc_cache_shrinker);
3709         class_unregister_type(LUSTRE_OSC_NAME);
3710         lu_kmem_fini(osc_caches);
3711         ptlrpc_free_rq_pool(osc_rq_pool);
3712 }
3713
3714 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3715 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3716 MODULE_VERSION(LUSTRE_VERSION_STRING);
3717 MODULE_LICENSE("GPL");
3718
3719 module_init(osc_init);
3720 module_exit(osc_exit);