Whamcloud - gitweb
LU-14045 sec: fix O_DIRECT and encrypted files
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 unsigned long consumed = cli->cl_reserved_grant;
1027
1028                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1029                         consumed += cli->cl_dirty_grant;
1030                 else
1031                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1032                 if (cli->cl_avail_grant < consumed) {
1033                         CERROR("%s: granted %ld but already consumed %ld\n",
1034                                cli_name(cli), cli->cl_avail_grant, consumed);
1035                         cli->cl_avail_grant = 0;
1036                 } else {
1037                         cli->cl_avail_grant -= consumed;
1038                 }
1039         }
1040
1041         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1042                 u64 size;
1043                 int chunk_mask;
1044
1045                 /* overhead for each extent insertion */
1046                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1047                 /* determine the appropriate chunk size used by osc_extent. */
1048                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1049                                           ocd->ocd_grant_blkbits);
1050                 /* max_pages_per_rpc must be chunk aligned */
1051                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1052                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1053                                              ~chunk_mask) & chunk_mask;
1054                 /* determine maximum extent size, in #pages */
1055                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1056                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1057                 if (cli->cl_max_extent_pages == 0)
1058                         cli->cl_max_extent_pages = 1;
1059         } else {
1060                 cli->cl_grant_extent_tax = 0;
1061                 cli->cl_chunkbits = PAGE_SHIFT;
1062                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1063         }
1064         spin_unlock(&cli->cl_loi_list_lock);
1065
1066         CDEBUG(D_CACHE,
1067                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1068                cli_name(cli),
1069                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1070                cli->cl_max_extent_pages);
1071
1072         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1073                 osc_add_grant_list(cli);
1074 }
1075 EXPORT_SYMBOL(osc_init_grant);
1076
1077 /* We assume that the reason this OSC got a short read is because it read
1078  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1079  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1080  * this stripe never got written at or beyond this stripe offset yet. */
1081 static void handle_short_read(int nob_read, size_t page_count,
1082                               struct brw_page **pga)
1083 {
1084         char *ptr;
1085         int i = 0;
1086
1087         /* skip bytes read OK */
1088         while (nob_read > 0) {
1089                 LASSERT (page_count > 0);
1090
1091                 if (pga[i]->count > nob_read) {
1092                         /* EOF inside this page */
1093                         ptr = kmap(pga[i]->pg) +
1094                                 (pga[i]->off & ~PAGE_MASK);
1095                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1096                         kunmap(pga[i]->pg);
1097                         page_count--;
1098                         i++;
1099                         break;
1100                 }
1101
1102                 nob_read -= pga[i]->count;
1103                 page_count--;
1104                 i++;
1105         }
1106
1107         /* zero remaining pages */
1108         while (page_count-- > 0) {
1109                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1110                 memset(ptr, 0, pga[i]->count);
1111                 kunmap(pga[i]->pg);
1112                 i++;
1113         }
1114 }
1115
1116 static int check_write_rcs(struct ptlrpc_request *req,
1117                            int requested_nob, int niocount,
1118                            size_t page_count, struct brw_page **pga)
1119 {
1120         int     i;
1121         __u32   *remote_rcs;
1122
1123         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1124                                                   sizeof(*remote_rcs) *
1125                                                   niocount);
1126         if (remote_rcs == NULL) {
1127                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1128                 return(-EPROTO);
1129         }
1130
1131         /* return error if any niobuf was in error */
1132         for (i = 0; i < niocount; i++) {
1133                 if ((int)remote_rcs[i] < 0) {
1134                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1135                                i, remote_rcs[i], req);
1136                         return remote_rcs[i];
1137                 }
1138
1139                 if (remote_rcs[i] != 0) {
1140                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1141                                 i, remote_rcs[i], req);
1142                         return(-EPROTO);
1143                 }
1144         }
1145         if (req->rq_bulk != NULL &&
1146             req->rq_bulk->bd_nob_transferred != requested_nob) {
1147                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1148                        req->rq_bulk->bd_nob_transferred, requested_nob);
1149                 return(-EPROTO);
1150         }
1151
1152         return (0);
1153 }
1154
1155 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1156 {
1157         if (p1->flag != p2->flag) {
1158                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1159                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1160                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1161
1162                 /* warn if we try to combine flags that we don't know to be
1163                  * safe to combine */
1164                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1165                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1166                               "report this at https://jira.whamcloud.com/\n",
1167                               p1->flag, p2->flag);
1168                 }
1169                 return 0;
1170         }
1171
1172         return (p1->off + p1->count == p2->off);
1173 }
1174
1175 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1176 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1177                                    size_t pg_count, struct brw_page **pga,
1178                                    int opc, obd_dif_csum_fn *fn,
1179                                    int sector_size,
1180                                    u32 *check_sum)
1181 {
1182         struct ahash_request *req;
1183         /* Used Adler as the default checksum type on top of DIF tags */
1184         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1185         struct page *__page;
1186         unsigned char *buffer;
1187         __u16 *guard_start;
1188         unsigned int bufsize;
1189         int guard_number;
1190         int used_number = 0;
1191         int used;
1192         u32 cksum;
1193         int rc = 0;
1194         int i = 0;
1195
1196         LASSERT(pg_count > 0);
1197
1198         __page = alloc_page(GFP_KERNEL);
1199         if (__page == NULL)
1200                 return -ENOMEM;
1201
1202         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1203         if (IS_ERR(req)) {
1204                 rc = PTR_ERR(req);
1205                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1206                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1207                 GOTO(out, rc);
1208         }
1209
1210         buffer = kmap(__page);
1211         guard_start = (__u16 *)buffer;
1212         guard_number = PAGE_SIZE / sizeof(*guard_start);
1213         while (nob > 0 && pg_count > 0) {
1214                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216                 /* corrupt the data before we compute the checksum, to
1217                  * simulate an OST->client data error */
1218                 if (unlikely(i == 0 && opc == OST_READ &&
1219                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1220                         unsigned char *ptr = kmap(pga[i]->pg);
1221                         int off = pga[i]->off & ~PAGE_MASK;
1222
1223                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1224                         kunmap(pga[i]->pg);
1225                 }
1226
1227                 /*
1228                  * The left guard number should be able to hold checksums of a
1229                  * whole page
1230                  */
1231                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1232                                                   pga[i]->off & ~PAGE_MASK,
1233                                                   count,
1234                                                   guard_start + used_number,
1235                                                   guard_number - used_number,
1236                                                   &used, sector_size,
1237                                                   fn);
1238                 if (rc)
1239                         break;
1240
1241                 used_number += used;
1242                 if (used_number == guard_number) {
1243                         cfs_crypto_hash_update_page(req, __page, 0,
1244                                 used_number * sizeof(*guard_start));
1245                         used_number = 0;
1246                 }
1247
1248                 nob -= pga[i]->count;
1249                 pg_count--;
1250                 i++;
1251         }
1252         kunmap(__page);
1253         if (rc)
1254                 GOTO(out, rc);
1255
1256         if (used_number != 0)
1257                 cfs_crypto_hash_update_page(req, __page, 0,
1258                         used_number * sizeof(*guard_start));
1259
1260         bufsize = sizeof(cksum);
1261         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1262
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 cksum++;
1267
1268         *check_sum = cksum;
1269 out:
1270         __free_page(__page);
1271         return rc;
1272 }
1273 #else /* !CONFIG_CRC_T10DIF */
1274 #define obd_dif_ip_fn NULL
1275 #define obd_dif_crc_fn NULL
1276 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1277         -EOPNOTSUPP
1278 #endif /* CONFIG_CRC_T10DIF */
1279
1280 static int osc_checksum_bulk(int nob, size_t pg_count,
1281                              struct brw_page **pga, int opc,
1282                              enum cksum_types cksum_type,
1283                              u32 *cksum)
1284 {
1285         int                             i = 0;
1286         struct ahash_request           *req;
1287         unsigned int                    bufsize;
1288         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1289
1290         LASSERT(pg_count > 0);
1291
1292         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1293         if (IS_ERR(req)) {
1294                 CERROR("Unable to initialize checksum hash %s\n",
1295                        cfs_crypto_hash_name(cfs_alg));
1296                 return PTR_ERR(req);
1297         }
1298
1299         while (nob > 0 && pg_count > 0) {
1300                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1301
1302                 /* corrupt the data before we compute the checksum, to
1303                  * simulate an OST->client data error */
1304                 if (i == 0 && opc == OST_READ &&
1305                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1306                         unsigned char *ptr = kmap(pga[i]->pg);
1307                         int off = pga[i]->off & ~PAGE_MASK;
1308
1309                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1310                         kunmap(pga[i]->pg);
1311                 }
1312                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1313                                             pga[i]->off & ~PAGE_MASK,
1314                                             count);
1315                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1316                                (int)(pga[i]->off & ~PAGE_MASK));
1317
1318                 nob -= pga[i]->count;
1319                 pg_count--;
1320                 i++;
1321         }
1322
1323         bufsize = sizeof(*cksum);
1324         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1325
1326         /* For sending we only compute the wrong checksum instead
1327          * of corrupting the data so it is still correct on a redo */
1328         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1329                 (*cksum)++;
1330
1331         return 0;
1332 }
1333
1334 static int osc_checksum_bulk_rw(const char *obd_name,
1335                                 enum cksum_types cksum_type,
1336                                 int nob, size_t pg_count,
1337                                 struct brw_page **pga, int opc,
1338                                 u32 *check_sum)
1339 {
1340         obd_dif_csum_fn *fn = NULL;
1341         int sector_size = 0;
1342         int rc;
1343
1344         ENTRY;
1345         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1346
1347         if (fn)
1348                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1349                                              opc, fn, sector_size, check_sum);
1350         else
1351                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1352                                        check_sum);
1353
1354         RETURN(rc);
1355 }
1356
1357 static inline void osc_release_bounce_pages(struct brw_page **pga,
1358                                             u32 page_count)
1359 {
1360 #ifdef HAVE_LUSTRE_CRYPTO
1361         int i;
1362
1363         for (i = 0; i < page_count; i++) {
1364                 /* Bounce pages allocated by a call to
1365                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1366                  * are identified thanks to the PageChecked flag.
1367                  */
1368                 if (PageChecked(pga[i]->pg))
1369                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1370                 pga[i]->count -= pga[i]->bp_count_diff;
1371                 pga[i]->off += pga[i]->bp_off_diff;
1372         }
1373 #endif
1374 }
1375
1376 static int
1377 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1378                      u32 page_count, struct brw_page **pga,
1379                      struct ptlrpc_request **reqp, int resend)
1380 {
1381         struct ptlrpc_request *req;
1382         struct ptlrpc_bulk_desc *desc;
1383         struct ost_body *body;
1384         struct obd_ioobj *ioobj;
1385         struct niobuf_remote *niobuf;
1386         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1387         struct osc_brw_async_args *aa;
1388         struct req_capsule *pill;
1389         struct brw_page *pg_prev;
1390         void *short_io_buf;
1391         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1392         struct inode *inode;
1393
1394         ENTRY;
1395         inode = page2inode(pga[0]->pg);
1396         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1397                 RETURN(-ENOMEM); /* Recoverable */
1398         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1399                 RETURN(-EINVAL); /* Fatal */
1400
1401         if ((cmd & OBD_BRW_WRITE) != 0) {
1402                 opc = OST_WRITE;
1403                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1404                                                 osc_rq_pool,
1405                                                 &RQF_OST_BRW_WRITE);
1406         } else {
1407                 opc = OST_READ;
1408                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1409         }
1410         if (req == NULL)
1411                 RETURN(-ENOMEM);
1412
1413         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1414                 for (i = 0; i < page_count; i++) {
1415                         struct brw_page *pg = pga[i];
1416                         struct page *data_page = NULL;
1417                         bool retried = false;
1418                         bool lockedbymyself;
1419                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1420
1421 retry_encrypt:
1422                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1423                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1424                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1425                         /* The page can already be locked when we arrive here.
1426                          * This is possible when cl_page_assume/vvp_page_assume
1427                          * is stuck on wait_on_page_writeback with page lock
1428                          * held. In this case there is no risk for the lock to
1429                          * be released while we are doing our encryption
1430                          * processing, because writeback against that page will
1431                          * end in vvp_page_completion_write/cl_page_completion,
1432                          * which means only once the page is fully processed.
1433                          */
1434                         lockedbymyself = trylock_page(pg->pg);
1435                         data_page =
1436                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1437                                                                  nunits, 0,
1438                                                                  GFP_NOFS);
1439                         if (lockedbymyself)
1440                                 unlock_page(pg->pg);
1441                         if (IS_ERR(data_page)) {
1442                                 rc = PTR_ERR(data_page);
1443                                 if (rc == -ENOMEM && !retried) {
1444                                         retried = true;
1445                                         rc = 0;
1446                                         goto retry_encrypt;
1447                                 }
1448                                 ptlrpc_request_free(req);
1449                                 RETURN(rc);
1450                         }
1451                         /* Set PageChecked flag on bounce page for
1452                          * disambiguation in osc_release_bounce_pages().
1453                          */
1454                         SetPageChecked(data_page);
1455                         pg->pg = data_page;
1456                         /* there should be no gap in the middle of page array */
1457                         if (i == page_count - 1) {
1458                                 struct osc_async_page *oap = brw_page2oap(pg);
1459
1460                                 oa->o_size = oap->oap_count +
1461                                         oap->oap_obj_off + oap->oap_page_off;
1462                         }
1463                         /* len is forced to nunits, and relative offset to 0
1464                          * so store the old, clear text info
1465                          */
1466                         pg->bp_count_diff = nunits - pg->count;
1467                         pg->count = nunits;
1468                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1469                         pg->off = pg->off & PAGE_MASK;
1470                 }
1471         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1472                 for (i = 0; i < page_count; i++) {
1473                         struct brw_page *pg = pga[i];
1474                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1475
1476                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1477                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1478                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1479                         /* count/off are forced to cover the whole encryption
1480                          * unit size so that all encrypted data is stored on the
1481                          * OST, so adjust bp_{count,off}_diff for the size of
1482                          * the clear text.
1483                          */
1484                         pg->bp_count_diff = nunits - pg->count;
1485                         pg->count = nunits;
1486                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1487                         pg->off = pg->off & PAGE_MASK;
1488                 }
1489         }
1490
1491         for (niocount = i = 1; i < page_count; i++) {
1492                 if (!can_merge_pages(pga[i - 1], pga[i]))
1493                         niocount++;
1494         }
1495
1496         pill = &req->rq_pill;
1497         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1498                              sizeof(*ioobj));
1499         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1500                              niocount * sizeof(*niobuf));
1501
1502         for (i = 0; i < page_count; i++) {
1503                 short_io_size += pga[i]->count;
1504                 if (!inode || !IS_ENCRYPTED(inode)) {
1505                         pga[i]->bp_count_diff = 0;
1506                         pga[i]->bp_off_diff = 0;
1507                 }
1508         }
1509
1510         /* Check if read/write is small enough to be a short io. */
1511         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1512             !imp_connect_shortio(cli->cl_import))
1513                 short_io_size = 0;
1514
1515         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1516                              opc == OST_READ ? 0 : short_io_size);
1517         if (opc == OST_READ)
1518                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1519                                      short_io_size);
1520
1521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1522         if (rc) {
1523                 ptlrpc_request_free(req);
1524                 RETURN(rc);
1525         }
1526         osc_set_io_portal(req);
1527
1528         ptlrpc_at_set_req_timeout(req);
1529         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1530          * retry logic */
1531         req->rq_no_retry_einprogress = 1;
1532
1533         if (short_io_size != 0) {
1534                 desc = NULL;
1535                 short_io_buf = NULL;
1536                 goto no_bulk;
1537         }
1538
1539         desc = ptlrpc_prep_bulk_imp(req, page_count,
1540                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1541                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1542                         PTLRPC_BULK_PUT_SINK),
1543                 OST_BULK_PORTAL,
1544                 &ptlrpc_bulk_kiov_pin_ops);
1545
1546         if (desc == NULL)
1547                 GOTO(out, rc = -ENOMEM);
1548         /* NB request now owns desc and will free it when it gets freed */
1549 no_bulk:
1550         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1551         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1552         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1553         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1554
1555         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1556
1557         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1558          * and from_kgid(), because they are asynchronous. Fortunately, variable
1559          * oa contains valid o_uid and o_gid in these two operations.
1560          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1561          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1562          * other process logic */
1563         body->oa.o_uid = oa->o_uid;
1564         body->oa.o_gid = oa->o_gid;
1565
1566         obdo_to_ioobj(oa, ioobj);
1567         ioobj->ioo_bufcnt = niocount;
1568         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1569          * that might be send for this request.  The actual number is decided
1570          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1571          * "max - 1" for old client compatibility sending "0", and also so the
1572          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1573         if (desc != NULL)
1574                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1575         else /* short io */
1576                 ioobj_max_brw_set(ioobj, 0);
1577
1578         if (short_io_size != 0) {
1579                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1580                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1581                         body->oa.o_flags = 0;
1582                 }
1583                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1584                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1585                        short_io_size);
1586                 if (opc == OST_WRITE) {
1587                         short_io_buf = req_capsule_client_get(pill,
1588                                                               &RMF_SHORT_IO);
1589                         LASSERT(short_io_buf != NULL);
1590                 }
1591         }
1592
1593         LASSERT(page_count > 0);
1594         pg_prev = pga[0];
1595         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1596                 struct brw_page *pg = pga[i];
1597                 int poff = pg->off & ~PAGE_MASK;
1598
1599                 LASSERT(pg->count > 0);
1600                 /* make sure there is no gap in the middle of page array */
1601                 LASSERTF(page_count == 1 ||
1602                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1603                           ergo(i > 0 && i < page_count - 1,
1604                                poff == 0 && pg->count == PAGE_SIZE)   &&
1605                           ergo(i == page_count - 1, poff == 0)),
1606                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1607                          i, page_count, pg, pg->off, pg->count);
1608                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1609                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1610                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1611                          i, page_count,
1612                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1613                          pg_prev->pg, page_private(pg_prev->pg),
1614                          pg_prev->pg->index, pg_prev->off);
1615                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1616                         (pg->flag & OBD_BRW_SRVLOCK));
1617                 if (short_io_size != 0 && opc == OST_WRITE) {
1618                         unsigned char *ptr = kmap_atomic(pg->pg);
1619
1620                         LASSERT(short_io_size >= requested_nob + pg->count);
1621                         memcpy(short_io_buf + requested_nob,
1622                                ptr + poff,
1623                                pg->count);
1624                         kunmap_atomic(ptr);
1625                 } else if (short_io_size == 0) {
1626                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1627                                                          pg->count);
1628                 }
1629                 requested_nob += pg->count;
1630
1631                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1632                         niobuf--;
1633                         niobuf->rnb_len += pg->count;
1634                 } else {
1635                         niobuf->rnb_offset = pg->off;
1636                         niobuf->rnb_len    = pg->count;
1637                         niobuf->rnb_flags  = pg->flag;
1638                 }
1639                 pg_prev = pg;
1640         }
1641
1642         LASSERTF((void *)(niobuf - niocount) ==
1643                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1644                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1645                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1646
1647         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1648         if (resend) {
1649                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1650                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1651                         body->oa.o_flags = 0;
1652                 }
1653                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1654         }
1655
1656         if (osc_should_shrink_grant(cli))
1657                 osc_shrink_grant_local(cli, &body->oa);
1658
1659         /* size[REQ_REC_OFF] still sizeof (*body) */
1660         if (opc == OST_WRITE) {
1661                 if (cli->cl_checksum &&
1662                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1663                         /* store cl_cksum_type in a local variable since
1664                          * it can be changed via lprocfs */
1665                         enum cksum_types cksum_type = cli->cl_cksum_type;
1666
1667                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1668                                 body->oa.o_flags = 0;
1669
1670                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1671                                                                 cksum_type);
1672                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1673
1674                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1675                                                   requested_nob, page_count,
1676                                                   pga, OST_WRITE,
1677                                                   &body->oa.o_cksum);
1678                         if (rc < 0) {
1679                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1680                                        rc);
1681                                 GOTO(out, rc);
1682                         }
1683                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1684                                body->oa.o_cksum);
1685
1686                         /* save this in 'oa', too, for later checking */
1687                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1688                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1689                                                            cksum_type);
1690                 } else {
1691                         /* clear out the checksum flag, in case this is a
1692                          * resend but cl_checksum is no longer set. b=11238 */
1693                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1694                 }
1695                 oa->o_cksum = body->oa.o_cksum;
1696                 /* 1 RC per niobuf */
1697                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1698                                      sizeof(__u32) * niocount);
1699         } else {
1700                 if (cli->cl_checksum &&
1701                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1702                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1703                                 body->oa.o_flags = 0;
1704                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1705                                 cli->cl_cksum_type);
1706                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1707                 }
1708
1709                 /* Client cksum has been already copied to wire obdo in previous
1710                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1711                  * resent due to cksum error, this will allow Server to
1712                  * check+dump pages on its side */
1713         }
1714         ptlrpc_request_set_replen(req);
1715
1716         aa = ptlrpc_req_async_args(aa, req);
1717         aa->aa_oa = oa;
1718         aa->aa_requested_nob = requested_nob;
1719         aa->aa_nio_count = niocount;
1720         aa->aa_page_count = page_count;
1721         aa->aa_resends = 0;
1722         aa->aa_ppga = pga;
1723         aa->aa_cli = cli;
1724         INIT_LIST_HEAD(&aa->aa_oaps);
1725
1726         *reqp = req;
1727         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1728         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1729                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1730                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1731         RETURN(0);
1732
1733  out:
1734         ptlrpc_req_finished(req);
1735         RETURN(rc);
1736 }
1737
1738 char dbgcksum_file_name[PATH_MAX];
1739
1740 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1741                                 struct brw_page **pga, __u32 server_cksum,
1742                                 __u32 client_cksum)
1743 {
1744         struct file *filp;
1745         int rc, i;
1746         unsigned int len;
1747         char *buf;
1748
1749         /* will only keep dump of pages on first error for the same range in
1750          * file/fid, not during the resends/retries. */
1751         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1752                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1753                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1754                   libcfs_debug_file_path_arr :
1755                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1756                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1757                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1758                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1759                  pga[0]->off,
1760                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1761                  client_cksum, server_cksum);
1762         filp = filp_open(dbgcksum_file_name,
1763                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1764         if (IS_ERR(filp)) {
1765                 rc = PTR_ERR(filp);
1766                 if (rc == -EEXIST)
1767                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1768                                "checksum error: rc = %d\n", dbgcksum_file_name,
1769                                rc);
1770                 else
1771                         CERROR("%s: can't open to dump pages with checksum "
1772                                "error: rc = %d\n", dbgcksum_file_name, rc);
1773                 return;
1774         }
1775
1776         for (i = 0; i < page_count; i++) {
1777                 len = pga[i]->count;
1778                 buf = kmap(pga[i]->pg);
1779                 while (len != 0) {
1780                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1781                         if (rc < 0) {
1782                                 CERROR("%s: wanted to write %u but got %d "
1783                                        "error\n", dbgcksum_file_name, len, rc);
1784                                 break;
1785                         }
1786                         len -= rc;
1787                         buf += rc;
1788                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1789                                dbgcksum_file_name, rc);
1790                 }
1791                 kunmap(pga[i]->pg);
1792         }
1793
1794         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1795         if (rc)
1796                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1797         filp_close(filp, NULL);
1798 }
1799
1800 static int
1801 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1802                      __u32 client_cksum, __u32 server_cksum,
1803                      struct osc_brw_async_args *aa)
1804 {
1805         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1806         enum cksum_types cksum_type;
1807         obd_dif_csum_fn *fn = NULL;
1808         int sector_size = 0;
1809         __u32 new_cksum;
1810         char *msg;
1811         int rc;
1812
1813         if (server_cksum == client_cksum) {
1814                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1815                 return 0;
1816         }
1817
1818         if (aa->aa_cli->cl_checksum_dump)
1819                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1820                                     server_cksum, client_cksum);
1821
1822         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1823                                            oa->o_flags : 0);
1824
1825         switch (cksum_type) {
1826         case OBD_CKSUM_T10IP512:
1827                 fn = obd_dif_ip_fn;
1828                 sector_size = 512;
1829                 break;
1830         case OBD_CKSUM_T10IP4K:
1831                 fn = obd_dif_ip_fn;
1832                 sector_size = 4096;
1833                 break;
1834         case OBD_CKSUM_T10CRC512:
1835                 fn = obd_dif_crc_fn;
1836                 sector_size = 512;
1837                 break;
1838         case OBD_CKSUM_T10CRC4K:
1839                 fn = obd_dif_crc_fn;
1840                 sector_size = 4096;
1841                 break;
1842         default:
1843                 break;
1844         }
1845
1846         if (fn)
1847                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1848                                              aa->aa_page_count, aa->aa_ppga,
1849                                              OST_WRITE, fn, sector_size,
1850                                              &new_cksum);
1851         else
1852                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1853                                        aa->aa_ppga, OST_WRITE, cksum_type,
1854                                        &new_cksum);
1855
1856         if (rc < 0)
1857                 msg = "failed to calculate the client write checksum";
1858         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1859                 msg = "the server did not use the checksum type specified in "
1860                       "the original request - likely a protocol problem";
1861         else if (new_cksum == server_cksum)
1862                 msg = "changed on the client after we checksummed it - "
1863                       "likely false positive due to mmap IO (bug 11742)";
1864         else if (new_cksum == client_cksum)
1865                 msg = "changed in transit before arrival at OST";
1866         else
1867                 msg = "changed in transit AND doesn't match the original - "
1868                       "likely false positive due to mmap IO (bug 11742)";
1869
1870         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1871                            DFID " object "DOSTID" extent [%llu-%llu], original "
1872                            "client csum %x (type %x), server csum %x (type %x),"
1873                            " client csum now %x\n",
1874                            obd_name, msg, libcfs_nid2str(peer->nid),
1875                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1876                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1877                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1878                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1879                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1880                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1881                            client_cksum,
1882                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1883                            server_cksum, cksum_type, new_cksum);
1884         return 1;
1885 }
1886
1887 /* Note rc enters this function as number of bytes transferred */
1888 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1889 {
1890         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1891         struct client_obd *cli = aa->aa_cli;
1892         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1893         const struct lnet_process_id *peer =
1894                 &req->rq_import->imp_connection->c_peer;
1895         struct ost_body *body;
1896         u32 client_cksum = 0;
1897         struct inode *inode;
1898
1899         ENTRY;
1900
1901         if (rc < 0 && rc != -EDQUOT) {
1902                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1903                 RETURN(rc);
1904         }
1905
1906         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1907         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1908         if (body == NULL) {
1909                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1910                 RETURN(-EPROTO);
1911         }
1912
1913         /* set/clear over quota flag for a uid/gid/projid */
1914         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1915             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1916                 unsigned qid[LL_MAXQUOTAS] = {
1917                                          body->oa.o_uid, body->oa.o_gid,
1918                                          body->oa.o_projid };
1919                 CDEBUG(D_QUOTA,
1920                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1921                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1922                        body->oa.o_valid, body->oa.o_flags);
1923                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1924                                        body->oa.o_flags);
1925         }
1926
1927         osc_update_grant(cli, body);
1928
1929         if (rc < 0)
1930                 RETURN(rc);
1931
1932         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1933                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1934
1935         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1936                 if (rc > 0) {
1937                         CERROR("%s: unexpected positive size %d\n",
1938                                obd_name, rc);
1939                         RETURN(-EPROTO);
1940                 }
1941
1942                 if (req->rq_bulk != NULL &&
1943                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1944                         RETURN(-EAGAIN);
1945
1946                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1947                     check_write_checksum(&body->oa, peer, client_cksum,
1948                                          body->oa.o_cksum, aa))
1949                         RETURN(-EAGAIN);
1950
1951                 rc = check_write_rcs(req, aa->aa_requested_nob,
1952                                      aa->aa_nio_count, aa->aa_page_count,
1953                                      aa->aa_ppga);
1954                 GOTO(out, rc);
1955         }
1956
1957         /* The rest of this function executes only for OST_READs */
1958
1959         if (req->rq_bulk == NULL) {
1960                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1961                                           RCL_SERVER);
1962                 LASSERT(rc == req->rq_status);
1963         } else {
1964                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1965                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1966         }
1967         if (rc < 0)
1968                 GOTO(out, rc = -EAGAIN);
1969
1970         if (rc > aa->aa_requested_nob) {
1971                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1972                        rc, aa->aa_requested_nob);
1973                 RETURN(-EPROTO);
1974         }
1975
1976         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1977                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1978                        rc, req->rq_bulk->bd_nob_transferred);
1979                 RETURN(-EPROTO);
1980         }
1981
1982         if (req->rq_bulk == NULL) {
1983                 /* short io */
1984                 int nob, pg_count, i = 0;
1985                 unsigned char *buf;
1986
1987                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1988                 pg_count = aa->aa_page_count;
1989                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1990                                                    rc);
1991                 nob = rc;
1992                 while (nob > 0 && pg_count > 0) {
1993                         unsigned char *ptr;
1994                         int count = aa->aa_ppga[i]->count > nob ?
1995                                     nob : aa->aa_ppga[i]->count;
1996
1997                         CDEBUG(D_CACHE, "page %p count %d\n",
1998                                aa->aa_ppga[i]->pg, count);
1999                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2000                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2001                                count);
2002                         kunmap_atomic((void *) ptr);
2003
2004                         buf += count;
2005                         nob -= count;
2006                         i++;
2007                         pg_count--;
2008                 }
2009         }
2010
2011         if (rc < aa->aa_requested_nob)
2012                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2013
2014         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2015                 static int cksum_counter;
2016                 u32        server_cksum = body->oa.o_cksum;
2017                 char      *via = "";
2018                 char      *router = "";
2019                 enum cksum_types cksum_type;
2020                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2021                         body->oa.o_flags : 0;
2022
2023                 cksum_type = obd_cksum_type_unpack(o_flags);
2024                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2025                                           aa->aa_page_count, aa->aa_ppga,
2026                                           OST_READ, &client_cksum);
2027                 if (rc < 0)
2028                         GOTO(out, rc);
2029
2030                 if (req->rq_bulk != NULL &&
2031                     peer->nid != req->rq_bulk->bd_sender) {
2032                         via = " via ";
2033                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2034                 }
2035
2036                 if (server_cksum != client_cksum) {
2037                         struct ost_body *clbody;
2038                         u32 page_count = aa->aa_page_count;
2039
2040                         clbody = req_capsule_client_get(&req->rq_pill,
2041                                                         &RMF_OST_BODY);
2042                         if (cli->cl_checksum_dump)
2043                                 dump_all_bulk_pages(&clbody->oa, page_count,
2044                                                     aa->aa_ppga, server_cksum,
2045                                                     client_cksum);
2046
2047                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2048                                            "%s%s%s inode "DFID" object "DOSTID
2049                                            " extent [%llu-%llu], client %x, "
2050                                            "server %x, cksum_type %x\n",
2051                                            obd_name,
2052                                            libcfs_nid2str(peer->nid),
2053                                            via, router,
2054                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2055                                                 clbody->oa.o_parent_seq : 0ULL,
2056                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2057                                                 clbody->oa.o_parent_oid : 0,
2058                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2059                                                 clbody->oa.o_parent_ver : 0,
2060                                            POSTID(&body->oa.o_oi),
2061                                            aa->aa_ppga[0]->off,
2062                                            aa->aa_ppga[page_count-1]->off +
2063                                            aa->aa_ppga[page_count-1]->count - 1,
2064                                            client_cksum, server_cksum,
2065                                            cksum_type);
2066                         cksum_counter = 0;
2067                         aa->aa_oa->o_cksum = client_cksum;
2068                         rc = -EAGAIN;
2069                 } else {
2070                         cksum_counter++;
2071                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2072                         rc = 0;
2073                 }
2074         } else if (unlikely(client_cksum)) {
2075                 static int cksum_missed;
2076
2077                 cksum_missed++;
2078                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2079                         CERROR("%s: checksum %u requested from %s but not sent\n",
2080                                obd_name, cksum_missed,
2081                                libcfs_nid2str(peer->nid));
2082         } else {
2083                 rc = 0;
2084         }
2085
2086         inode = page2inode(aa->aa_ppga[0]->pg);
2087         if (inode && IS_ENCRYPTED(inode)) {
2088                 int idx;
2089
2090                 if (!llcrypt_has_encryption_key(inode)) {
2091                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2092                         GOTO(out, rc);
2093                 }
2094                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2095                         struct brw_page *pg = aa->aa_ppga[idx];
2096                         unsigned int offs = 0;
2097
2098                         while (offs < PAGE_SIZE) {
2099                                 /* do not decrypt if page is all 0s */
2100                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2101                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2102                                         /* if page is empty forward info to
2103                                          * upper layers (ll_io_zero_page) by
2104                                          * clearing PagePrivate2
2105                                          */
2106                                         if (!offs)
2107                                                 ClearPagePrivate2(pg->pg);
2108                                         break;
2109                                 }
2110
2111                                 /* The page is already locked when we arrive here,
2112                                  * except when we deal with a twisted page for
2113                                  * specific Direct IO support, in which case
2114                                  * PageChecked flag is set on page.
2115                                  */
2116                                 if (PageChecked(pg->pg))
2117                                         lock_page(pg->pg);
2118                                 rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2119                                                     LUSTRE_ENCRYPTION_UNIT_SIZE,
2120                                                                       offs);
2121                                 if (PageChecked(pg->pg))
2122                                         unlock_page(pg->pg);
2123                                 if (rc)
2124                                         GOTO(out, rc);
2125
2126                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2127                         }
2128                 }
2129         }
2130
2131 out:
2132         if (rc >= 0)
2133                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2134                                      aa->aa_oa, &body->oa);
2135
2136         RETURN(rc);
2137 }
2138
2139 static int osc_brw_redo_request(struct ptlrpc_request *request,
2140                                 struct osc_brw_async_args *aa, int rc)
2141 {
2142         struct ptlrpc_request *new_req;
2143         struct osc_brw_async_args *new_aa;
2144         struct osc_async_page *oap;
2145         ENTRY;
2146
2147         /* The below message is checked in replay-ost-single.sh test_8ae*/
2148         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2149                   "redo for recoverable error %d", rc);
2150
2151         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2152                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2153                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2154                                   aa->aa_ppga, &new_req, 1);
2155         if (rc)
2156                 RETURN(rc);
2157
2158         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2159                 if (oap->oap_request != NULL) {
2160                         LASSERTF(request == oap->oap_request,
2161                                  "request %p != oap_request %p\n",
2162                                  request, oap->oap_request);
2163                 }
2164         }
2165         /*
2166          * New request takes over pga and oaps from old request.
2167          * Note that copying a list_head doesn't work, need to move it...
2168          */
2169         aa->aa_resends++;
2170         new_req->rq_interpret_reply = request->rq_interpret_reply;
2171         new_req->rq_async_args = request->rq_async_args;
2172         new_req->rq_commit_cb = request->rq_commit_cb;
2173         /* cap resend delay to the current request timeout, this is similar to
2174          * what ptlrpc does (see after_reply()) */
2175         if (aa->aa_resends > new_req->rq_timeout)
2176                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2177         else
2178                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2179         new_req->rq_generation_set = 1;
2180         new_req->rq_import_generation = request->rq_import_generation;
2181
2182         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2183
2184         INIT_LIST_HEAD(&new_aa->aa_oaps);
2185         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2186         INIT_LIST_HEAD(&new_aa->aa_exts);
2187         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2188         new_aa->aa_resends = aa->aa_resends;
2189
2190         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2191                 if (oap->oap_request) {
2192                         ptlrpc_req_finished(oap->oap_request);
2193                         oap->oap_request = ptlrpc_request_addref(new_req);
2194                 }
2195         }
2196
2197         /* XXX: This code will run into problem if we're going to support
2198          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2199          * and wait for all of them to be finished. We should inherit request
2200          * set from old request. */
2201         ptlrpcd_add_req(new_req);
2202
2203         DEBUG_REQ(D_INFO, new_req, "new request");
2204         RETURN(0);
2205 }
2206
2207 /*
2208  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2209  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2210  * fine for our small page arrays and doesn't require allocation.  its an
2211  * insertion sort that swaps elements that are strides apart, shrinking the
2212  * stride down until its '1' and the array is sorted.
2213  */
2214 static void sort_brw_pages(struct brw_page **array, int num)
2215 {
2216         int stride, i, j;
2217         struct brw_page *tmp;
2218
2219         if (num == 1)
2220                 return;
2221         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2222                 ;
2223
2224         do {
2225                 stride /= 3;
2226                 for (i = stride ; i < num ; i++) {
2227                         tmp = array[i];
2228                         j = i;
2229                         while (j >= stride && array[j - stride]->off > tmp->off) {
2230                                 array[j] = array[j - stride];
2231                                 j -= stride;
2232                         }
2233                         array[j] = tmp;
2234                 }
2235         } while (stride > 1);
2236 }
2237
2238 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2239 {
2240         LASSERT(ppga != NULL);
2241         OBD_FREE_PTR_ARRAY(ppga, count);
2242 }
2243
2244 static int brw_interpret(const struct lu_env *env,
2245                          struct ptlrpc_request *req, void *args, int rc)
2246 {
2247         struct osc_brw_async_args *aa = args;
2248         struct osc_extent *ext;
2249         struct osc_extent *tmp;
2250         struct client_obd *cli = aa->aa_cli;
2251         unsigned long transferred = 0;
2252
2253         ENTRY;
2254
2255         rc = osc_brw_fini_request(req, rc);
2256         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2257
2258         /* restore clear text pages */
2259         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2260
2261         /*
2262          * When server returns -EINPROGRESS, client should always retry
2263          * regardless of the number of times the bulk was resent already.
2264          */
2265         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2266                 if (req->rq_import_generation !=
2267                     req->rq_import->imp_generation) {
2268                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2269                                ""DOSTID", rc = %d.\n",
2270                                req->rq_import->imp_obd->obd_name,
2271                                POSTID(&aa->aa_oa->o_oi), rc);
2272                 } else if (rc == -EINPROGRESS ||
2273                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2274                         rc = osc_brw_redo_request(req, aa, rc);
2275                 } else {
2276                         CERROR("%s: too many resent retries for object: "
2277                                "%llu:%llu, rc = %d.\n",
2278                                req->rq_import->imp_obd->obd_name,
2279                                POSTID(&aa->aa_oa->o_oi), rc);
2280                 }
2281
2282                 if (rc == 0)
2283                         RETURN(0);
2284                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2285                         rc = -EIO;
2286         }
2287
2288         if (rc == 0) {
2289                 struct obdo *oa = aa->aa_oa;
2290                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2291                 unsigned long valid = 0;
2292                 struct cl_object *obj;
2293                 struct osc_async_page *last;
2294
2295                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2296                 obj = osc2cl(last->oap_obj);
2297
2298                 cl_object_attr_lock(obj);
2299                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2300                         attr->cat_blocks = oa->o_blocks;
2301                         valid |= CAT_BLOCKS;
2302                 }
2303                 if (oa->o_valid & OBD_MD_FLMTIME) {
2304                         attr->cat_mtime = oa->o_mtime;
2305                         valid |= CAT_MTIME;
2306                 }
2307                 if (oa->o_valid & OBD_MD_FLATIME) {
2308                         attr->cat_atime = oa->o_atime;
2309                         valid |= CAT_ATIME;
2310                 }
2311                 if (oa->o_valid & OBD_MD_FLCTIME) {
2312                         attr->cat_ctime = oa->o_ctime;
2313                         valid |= CAT_CTIME;
2314                 }
2315
2316                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2317                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2318                         loff_t last_off = last->oap_count + last->oap_obj_off +
2319                                 last->oap_page_off;
2320
2321                         /* Change file size if this is an out of quota or
2322                          * direct IO write and it extends the file size */
2323                         if (loi->loi_lvb.lvb_size < last_off) {
2324                                 attr->cat_size = last_off;
2325                                 valid |= CAT_SIZE;
2326                         }
2327                         /* Extend KMS if it's not a lockless write */
2328                         if (loi->loi_kms < last_off &&
2329                             oap2osc_page(last)->ops_srvlock == 0) {
2330                                 attr->cat_kms = last_off;
2331                                 valid |= CAT_KMS;
2332                         }
2333                 }
2334
2335                 if (valid != 0)
2336                         cl_object_attr_update(env, obj, attr, valid);
2337                 cl_object_attr_unlock(obj);
2338         }
2339         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2340         aa->aa_oa = NULL;
2341
2342         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2343                 osc_inc_unstable_pages(req);
2344
2345         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2346                 list_del_init(&ext->oe_link);
2347                 osc_extent_finish(env, ext, 1,
2348                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2349         }
2350         LASSERT(list_empty(&aa->aa_exts));
2351         LASSERT(list_empty(&aa->aa_oaps));
2352
2353         transferred = (req->rq_bulk == NULL ? /* short io */
2354                        aa->aa_requested_nob :
2355                        req->rq_bulk->bd_nob_transferred);
2356
2357         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2358         ptlrpc_lprocfs_brw(req, transferred);
2359
2360         spin_lock(&cli->cl_loi_list_lock);
2361         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2362          * is called so we know whether to go to sync BRWs or wait for more
2363          * RPCs to complete */
2364         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2365                 cli->cl_w_in_flight--;
2366         else
2367                 cli->cl_r_in_flight--;
2368         osc_wake_cache_waiters(cli);
2369         spin_unlock(&cli->cl_loi_list_lock);
2370
2371         osc_io_unplug(env, cli, NULL);
2372         RETURN(rc);
2373 }
2374
2375 static void brw_commit(struct ptlrpc_request *req)
2376 {
2377         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2378          * this called via the rq_commit_cb, I need to ensure
2379          * osc_dec_unstable_pages is still called. Otherwise unstable
2380          * pages may be leaked. */
2381         spin_lock(&req->rq_lock);
2382         if (likely(req->rq_unstable)) {
2383                 req->rq_unstable = 0;
2384                 spin_unlock(&req->rq_lock);
2385
2386                 osc_dec_unstable_pages(req);
2387         } else {
2388                 req->rq_committed = 1;
2389                 spin_unlock(&req->rq_lock);
2390         }
2391 }
2392
2393 /**
2394  * Build an RPC by the list of extent @ext_list. The caller must ensure
2395  * that the total pages in this list are NOT over max pages per RPC.
2396  * Extents in the list must be in OES_RPC state.
2397  */
2398 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2399                   struct list_head *ext_list, int cmd)
2400 {
2401         struct ptlrpc_request           *req = NULL;
2402         struct osc_extent               *ext;
2403         struct brw_page                 **pga = NULL;
2404         struct osc_brw_async_args       *aa = NULL;
2405         struct obdo                     *oa = NULL;
2406         struct osc_async_page           *oap;
2407         struct osc_object               *obj = NULL;
2408         struct cl_req_attr              *crattr = NULL;
2409         loff_t                          starting_offset = OBD_OBJECT_EOF;
2410         loff_t                          ending_offset = 0;
2411         /* '1' for consistency with code that checks !mpflag to restore */
2412         int mpflag = 1;
2413         int                             mem_tight = 0;
2414         int                             page_count = 0;
2415         bool                            soft_sync = false;
2416         bool                            ndelay = false;
2417         int                             i;
2418         int                             grant = 0;
2419         int                             rc;
2420         __u32                           layout_version = 0;
2421         LIST_HEAD(rpc_list);
2422         struct ost_body                 *body;
2423         ENTRY;
2424         LASSERT(!list_empty(ext_list));
2425
2426         /* add pages into rpc_list to build BRW rpc */
2427         list_for_each_entry(ext, ext_list, oe_link) {
2428                 LASSERT(ext->oe_state == OES_RPC);
2429                 mem_tight |= ext->oe_memalloc;
2430                 grant += ext->oe_grants;
2431                 page_count += ext->oe_nr_pages;
2432                 layout_version = max(layout_version, ext->oe_layout_version);
2433                 if (obj == NULL)
2434                         obj = ext->oe_obj;
2435         }
2436
2437         soft_sync = osc_over_unstable_soft_limit(cli);
2438         if (mem_tight)
2439                 mpflag = memalloc_noreclaim_save();
2440
2441         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2442         if (pga == NULL)
2443                 GOTO(out, rc = -ENOMEM);
2444
2445         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2446         if (oa == NULL)
2447                 GOTO(out, rc = -ENOMEM);
2448
2449         i = 0;
2450         list_for_each_entry(ext, ext_list, oe_link) {
2451                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2452                         if (mem_tight)
2453                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2454                         if (soft_sync)
2455                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2456                         pga[i] = &oap->oap_brw_page;
2457                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2458                         i++;
2459
2460                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2461                         if (starting_offset == OBD_OBJECT_EOF ||
2462                             starting_offset > oap->oap_obj_off)
2463                                 starting_offset = oap->oap_obj_off;
2464                         else
2465                                 LASSERT(oap->oap_page_off == 0);
2466                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2467                                 ending_offset = oap->oap_obj_off +
2468                                                 oap->oap_count;
2469                         else
2470                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2471                                         PAGE_SIZE);
2472                 }
2473                 if (ext->oe_ndelay)
2474                         ndelay = true;
2475         }
2476
2477         /* first page in the list */
2478         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2479
2480         crattr = &osc_env_info(env)->oti_req_attr;
2481         memset(crattr, 0, sizeof(*crattr));
2482         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2483         crattr->cra_flags = ~0ULL;
2484         crattr->cra_page = oap2cl_page(oap);
2485         crattr->cra_oa = oa;
2486         cl_req_attr_set(env, osc2cl(obj), crattr);
2487
2488         if (cmd == OBD_BRW_WRITE) {
2489                 oa->o_grant_used = grant;
2490                 if (layout_version > 0) {
2491                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2492                                PFID(&oa->o_oi.oi_fid), layout_version);
2493
2494                         oa->o_layout_version = layout_version;
2495                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2496                 }
2497         }
2498
2499         sort_brw_pages(pga, page_count);
2500         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2501         if (rc != 0) {
2502                 CERROR("prep_req failed: %d\n", rc);
2503                 GOTO(out, rc);
2504         }
2505
2506         req->rq_commit_cb = brw_commit;
2507         req->rq_interpret_reply = brw_interpret;
2508         req->rq_memalloc = mem_tight != 0;
2509         oap->oap_request = ptlrpc_request_addref(req);
2510         if (ndelay) {
2511                 req->rq_no_resend = req->rq_no_delay = 1;
2512                 /* probably set a shorter timeout value.
2513                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2514                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2515         }
2516
2517         /* Need to update the timestamps after the request is built in case
2518          * we race with setattr (locally or in queue at OST).  If OST gets
2519          * later setattr before earlier BRW (as determined by the request xid),
2520          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2521          * way to do this in a single call.  bug 10150 */
2522         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2523         crattr->cra_oa = &body->oa;
2524         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2525         cl_req_attr_set(env, osc2cl(obj), crattr);
2526         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2527
2528         aa = ptlrpc_req_async_args(aa, req);
2529         INIT_LIST_HEAD(&aa->aa_oaps);
2530         list_splice_init(&rpc_list, &aa->aa_oaps);
2531         INIT_LIST_HEAD(&aa->aa_exts);
2532         list_splice_init(ext_list, &aa->aa_exts);
2533
2534         spin_lock(&cli->cl_loi_list_lock);
2535         starting_offset >>= PAGE_SHIFT;
2536         if (cmd == OBD_BRW_READ) {
2537                 cli->cl_r_in_flight++;
2538                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2539                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2540                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2541                                       starting_offset + 1);
2542         } else {
2543                 cli->cl_w_in_flight++;
2544                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2545                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2546                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2547                                       starting_offset + 1);
2548         }
2549         spin_unlock(&cli->cl_loi_list_lock);
2550
2551         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2552                   page_count, aa, cli->cl_r_in_flight,
2553                   cli->cl_w_in_flight);
2554         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2555
2556         ptlrpcd_add_req(req);
2557         rc = 0;
2558         EXIT;
2559
2560 out:
2561         if (mem_tight)
2562                 memalloc_noreclaim_restore(mpflag);
2563
2564         if (rc != 0) {
2565                 LASSERT(req == NULL);
2566
2567                 if (oa)
2568                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2569                 if (pga) {
2570                         osc_release_bounce_pages(pga, page_count);
2571                         osc_release_ppga(pga, page_count);
2572                 }
2573                 /* this should happen rarely and is pretty bad, it makes the
2574                  * pending list not follow the dirty order */
2575                 while (!list_empty(ext_list)) {
2576                         ext = list_entry(ext_list->next, struct osc_extent,
2577                                          oe_link);
2578                         list_del_init(&ext->oe_link);
2579                         osc_extent_finish(env, ext, 0, rc);
2580                 }
2581         }
2582         RETURN(rc);
2583 }
2584
2585 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2586 {
2587         int set = 0;
2588
2589         LASSERT(lock != NULL);
2590
2591         lock_res_and_lock(lock);
2592
2593         if (lock->l_ast_data == NULL)
2594                 lock->l_ast_data = data;
2595         if (lock->l_ast_data == data)
2596                 set = 1;
2597
2598         unlock_res_and_lock(lock);
2599
2600         return set;
2601 }
2602
2603 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2604                      void *cookie, struct lustre_handle *lockh,
2605                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2606                      int errcode)
2607 {
2608         bool intent = *flags & LDLM_FL_HAS_INTENT;
2609         int rc;
2610         ENTRY;
2611
2612         /* The request was created before ldlm_cli_enqueue call. */
2613         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2614                 struct ldlm_reply *rep;
2615
2616                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2617                 LASSERT(rep != NULL);
2618
2619                 rep->lock_policy_res1 =
2620                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2621                 if (rep->lock_policy_res1)
2622                         errcode = rep->lock_policy_res1;
2623                 if (!speculative)
2624                         *flags |= LDLM_FL_LVB_READY;
2625         } else if (errcode == ELDLM_OK) {
2626                 *flags |= LDLM_FL_LVB_READY;
2627         }
2628
2629         /* Call the update callback. */
2630         rc = (*upcall)(cookie, lockh, errcode);
2631
2632         /* release the reference taken in ldlm_cli_enqueue() */
2633         if (errcode == ELDLM_LOCK_MATCHED)
2634                 errcode = ELDLM_OK;
2635         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2636                 ldlm_lock_decref(lockh, mode);
2637
2638         RETURN(rc);
2639 }
2640
2641 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2642                           void *args, int rc)
2643 {
2644         struct osc_enqueue_args *aa = args;
2645         struct ldlm_lock *lock;
2646         struct lustre_handle *lockh = &aa->oa_lockh;
2647         enum ldlm_mode mode = aa->oa_mode;
2648         struct ost_lvb *lvb = aa->oa_lvb;
2649         __u32 lvb_len = sizeof(*lvb);
2650         __u64 flags = 0;
2651
2652         ENTRY;
2653
2654         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2655          * be valid. */
2656         lock = ldlm_handle2lock(lockh);
2657         LASSERTF(lock != NULL,
2658                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2659                  lockh->cookie, req, aa);
2660
2661         /* Take an additional reference so that a blocking AST that
2662          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2663          * to arrive after an upcall has been executed by
2664          * osc_enqueue_fini(). */
2665         ldlm_lock_addref(lockh, mode);
2666
2667         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2668         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2669
2670         /* Let CP AST to grant the lock first. */
2671         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2672
2673         if (aa->oa_speculative) {
2674                 LASSERT(aa->oa_lvb == NULL);
2675                 LASSERT(aa->oa_flags == NULL);
2676                 aa->oa_flags = &flags;
2677         }
2678
2679         /* Complete obtaining the lock procedure. */
2680         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2681                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2682                                    lockh, rc);
2683         /* Complete osc stuff. */
2684         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2685                               aa->oa_flags, aa->oa_speculative, rc);
2686
2687         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2688
2689         ldlm_lock_decref(lockh, mode);
2690         LDLM_LOCK_PUT(lock);
2691         RETURN(rc);
2692 }
2693
2694 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2695  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2696  * other synchronous requests, however keeping some locks and trying to obtain
2697  * others may take a considerable amount of time in a case of ost failure; and
2698  * when other sync requests do not get released lock from a client, the client
2699  * is evicted from the cluster -- such scenarious make the life difficult, so
2700  * release locks just after they are obtained. */
2701 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2702                      __u64 *flags, union ldlm_policy_data *policy,
2703                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2704                      void *cookie, struct ldlm_enqueue_info *einfo,
2705                      struct ptlrpc_request_set *rqset, int async,
2706                      bool speculative)
2707 {
2708         struct obd_device *obd = exp->exp_obd;
2709         struct lustre_handle lockh = { 0 };
2710         struct ptlrpc_request *req = NULL;
2711         int intent = *flags & LDLM_FL_HAS_INTENT;
2712         __u64 match_flags = *flags;
2713         enum ldlm_mode mode;
2714         int rc;
2715         ENTRY;
2716
2717         /* Filesystem lock extents are extended to page boundaries so that
2718          * dealing with the page cache is a little smoother.  */
2719         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2720         policy->l_extent.end |= ~PAGE_MASK;
2721
2722         /* Next, search for already existing extent locks that will cover us */
2723         /* If we're trying to read, we also search for an existing PW lock.  The
2724          * VFS and page cache already protect us locally, so lots of readers/
2725          * writers can share a single PW lock.
2726          *
2727          * There are problems with conversion deadlocks, so instead of
2728          * converting a read lock to a write lock, we'll just enqueue a new
2729          * one.
2730          *
2731          * At some point we should cancel the read lock instead of making them
2732          * send us a blocking callback, but there are problems with canceling
2733          * locks out from other users right now, too. */
2734         mode = einfo->ei_mode;
2735         if (einfo->ei_mode == LCK_PR)
2736                 mode |= LCK_PW;
2737         /* Normal lock requests must wait for the LVB to be ready before
2738          * matching a lock; speculative lock requests do not need to,
2739          * because they will not actually use the lock. */
2740         if (!speculative)
2741                 match_flags |= LDLM_FL_LVB_READY;
2742         if (intent != 0)
2743                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2744         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2745                                einfo->ei_type, policy, mode, &lockh);
2746         if (mode) {
2747                 struct ldlm_lock *matched;
2748
2749                 if (*flags & LDLM_FL_TEST_LOCK)
2750                         RETURN(ELDLM_OK);
2751
2752                 matched = ldlm_handle2lock(&lockh);
2753                 if (speculative) {
2754                         /* This DLM lock request is speculative, and does not
2755                          * have an associated IO request. Therefore if there
2756                          * is already a DLM lock, it wll just inform the
2757                          * caller to cancel the request for this stripe.*/
2758                         lock_res_and_lock(matched);
2759                         if (ldlm_extent_equal(&policy->l_extent,
2760                             &matched->l_policy_data.l_extent))
2761                                 rc = -EEXIST;
2762                         else
2763                                 rc = -ECANCELED;
2764                         unlock_res_and_lock(matched);
2765
2766                         ldlm_lock_decref(&lockh, mode);
2767                         LDLM_LOCK_PUT(matched);
2768                         RETURN(rc);
2769                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2770                         *flags |= LDLM_FL_LVB_READY;
2771
2772                         /* We already have a lock, and it's referenced. */
2773                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2774
2775                         ldlm_lock_decref(&lockh, mode);
2776                         LDLM_LOCK_PUT(matched);
2777                         RETURN(ELDLM_OK);
2778                 } else {
2779                         ldlm_lock_decref(&lockh, mode);
2780                         LDLM_LOCK_PUT(matched);
2781                 }
2782         }
2783
2784         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2785                 RETURN(-ENOLCK);
2786
2787         if (intent) {
2788                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2789                                            &RQF_LDLM_ENQUEUE_LVB);
2790                 if (req == NULL)
2791                         RETURN(-ENOMEM);
2792
2793                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2794                 if (rc) {
2795                         ptlrpc_request_free(req);
2796                         RETURN(rc);
2797                 }
2798
2799                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2800                                      sizeof *lvb);
2801                 ptlrpc_request_set_replen(req);
2802         }
2803
2804         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2805         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2806
2807         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2808                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2809         if (async) {
2810                 if (!rc) {
2811                         struct osc_enqueue_args *aa;
2812                         aa = ptlrpc_req_async_args(aa, req);
2813                         aa->oa_exp         = exp;
2814                         aa->oa_mode        = einfo->ei_mode;
2815                         aa->oa_type        = einfo->ei_type;
2816                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2817                         aa->oa_upcall      = upcall;
2818                         aa->oa_cookie      = cookie;
2819                         aa->oa_speculative = speculative;
2820                         if (!speculative) {
2821                                 aa->oa_flags  = flags;
2822                                 aa->oa_lvb    = lvb;
2823                         } else {
2824                                 /* speculative locks are essentially to enqueue
2825                                  * a DLM lock  in advance, so we don't care
2826                                  * about the result of the enqueue. */
2827                                 aa->oa_lvb    = NULL;
2828                                 aa->oa_flags  = NULL;
2829                         }
2830
2831                         req->rq_interpret_reply = osc_enqueue_interpret;
2832                         ptlrpc_set_add_req(rqset, req);
2833                 } else if (intent) {
2834                         ptlrpc_req_finished(req);
2835                 }
2836                 RETURN(rc);
2837         }
2838
2839         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2840                               flags, speculative, rc);
2841         if (intent)
2842                 ptlrpc_req_finished(req);
2843
2844         RETURN(rc);
2845 }
2846
2847 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2848                    struct ldlm_res_id *res_id, enum ldlm_type type,
2849                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2850                    __u64 *flags, struct osc_object *obj,
2851                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2852 {
2853         struct obd_device *obd = exp->exp_obd;
2854         __u64 lflags = *flags;
2855         enum ldlm_mode rc;
2856         ENTRY;
2857
2858         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2859                 RETURN(-EIO);
2860
2861         /* Filesystem lock extents are extended to page boundaries so that
2862          * dealing with the page cache is a little smoother */
2863         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2864         policy->l_extent.end |= ~PAGE_MASK;
2865
2866         /* Next, search for already existing extent locks that will cover us */
2867         /* If we're trying to read, we also search for an existing PW lock.  The
2868          * VFS and page cache already protect us locally, so lots of readers/
2869          * writers can share a single PW lock. */
2870         rc = mode;
2871         if (mode == LCK_PR)
2872                 rc |= LCK_PW;
2873
2874         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2875                                         res_id, type, policy, rc, lockh,
2876                                         match_flags);
2877         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2878                 RETURN(rc);
2879
2880         if (obj != NULL) {
2881                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2882
2883                 LASSERT(lock != NULL);
2884                 if (osc_set_lock_data(lock, obj)) {
2885                         lock_res_and_lock(lock);
2886                         if (!ldlm_is_lvb_cached(lock)) {
2887                                 LASSERT(lock->l_ast_data == obj);
2888                                 osc_lock_lvb_update(env, obj, lock, NULL);
2889                                 ldlm_set_lvb_cached(lock);
2890                         }
2891                         unlock_res_and_lock(lock);
2892                 } else {
2893                         ldlm_lock_decref(lockh, rc);
2894                         rc = 0;
2895                 }
2896                 LDLM_LOCK_PUT(lock);
2897         }
2898         RETURN(rc);
2899 }
2900
2901 static int osc_statfs_interpret(const struct lu_env *env,
2902                                 struct ptlrpc_request *req, void *args, int rc)
2903 {
2904         struct osc_async_args *aa = args;
2905         struct obd_statfs *msfs;
2906
2907         ENTRY;
2908         if (rc == -EBADR)
2909                 /*
2910                  * The request has in fact never been sent due to issues at
2911                  * a higher level (LOV).  Exit immediately since the caller
2912                  * is aware of the problem and takes care of the clean up.
2913                  */
2914                 RETURN(rc);
2915
2916         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2917             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2918                 GOTO(out, rc = 0);
2919
2920         if (rc != 0)
2921                 GOTO(out, rc);
2922
2923         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2924         if (msfs == NULL)
2925                 GOTO(out, rc = -EPROTO);
2926
2927         *aa->aa_oi->oi_osfs = *msfs;
2928 out:
2929         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2930
2931         RETURN(rc);
2932 }
2933
2934 static int osc_statfs_async(struct obd_export *exp,
2935                             struct obd_info *oinfo, time64_t max_age,
2936                             struct ptlrpc_request_set *rqset)
2937 {
2938         struct obd_device     *obd = class_exp2obd(exp);
2939         struct ptlrpc_request *req;
2940         struct osc_async_args *aa;
2941         int rc;
2942         ENTRY;
2943
2944         if (obd->obd_osfs_age >= max_age) {
2945                 CDEBUG(D_SUPER,
2946                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2947                        obd->obd_name, &obd->obd_osfs,
2948                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2949                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2950                 spin_lock(&obd->obd_osfs_lock);
2951                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2952                 spin_unlock(&obd->obd_osfs_lock);
2953                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2954                 if (oinfo->oi_cb_up)
2955                         oinfo->oi_cb_up(oinfo, 0);
2956
2957                 RETURN(0);
2958         }
2959
2960         /* We could possibly pass max_age in the request (as an absolute
2961          * timestamp or a "seconds.usec ago") so the target can avoid doing
2962          * extra calls into the filesystem if that isn't necessary (e.g.
2963          * during mount that would help a bit).  Having relative timestamps
2964          * is not so great if request processing is slow, while absolute
2965          * timestamps are not ideal because they need time synchronization. */
2966         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2967         if (req == NULL)
2968                 RETURN(-ENOMEM);
2969
2970         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2971         if (rc) {
2972                 ptlrpc_request_free(req);
2973                 RETURN(rc);
2974         }
2975         ptlrpc_request_set_replen(req);
2976         req->rq_request_portal = OST_CREATE_PORTAL;
2977         ptlrpc_at_set_req_timeout(req);
2978
2979         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2980                 /* procfs requests not want stat in wait for avoid deadlock */
2981                 req->rq_no_resend = 1;
2982                 req->rq_no_delay = 1;
2983         }
2984
2985         req->rq_interpret_reply = osc_statfs_interpret;
2986         aa = ptlrpc_req_async_args(aa, req);
2987         aa->aa_oi = oinfo;
2988
2989         ptlrpc_set_add_req(rqset, req);
2990         RETURN(0);
2991 }
2992
2993 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2994                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2995 {
2996         struct obd_device     *obd = class_exp2obd(exp);
2997         struct obd_statfs     *msfs;
2998         struct ptlrpc_request *req;
2999         struct obd_import     *imp = NULL;
3000         int rc;
3001         ENTRY;
3002
3003
3004         /*Since the request might also come from lprocfs, so we need
3005          *sync this with client_disconnect_export Bug15684*/
3006         down_read(&obd->u.cli.cl_sem);
3007         if (obd->u.cli.cl_import)
3008                 imp = class_import_get(obd->u.cli.cl_import);
3009         up_read(&obd->u.cli.cl_sem);
3010         if (!imp)
3011                 RETURN(-ENODEV);
3012
3013         /* We could possibly pass max_age in the request (as an absolute
3014          * timestamp or a "seconds.usec ago") so the target can avoid doing
3015          * extra calls into the filesystem if that isn't necessary (e.g.
3016          * during mount that would help a bit).  Having relative timestamps
3017          * is not so great if request processing is slow, while absolute
3018          * timestamps are not ideal because they need time synchronization. */
3019         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3020
3021         class_import_put(imp);
3022
3023         if (req == NULL)
3024                 RETURN(-ENOMEM);
3025
3026         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3027         if (rc) {
3028                 ptlrpc_request_free(req);
3029                 RETURN(rc);
3030         }
3031         ptlrpc_request_set_replen(req);
3032         req->rq_request_portal = OST_CREATE_PORTAL;
3033         ptlrpc_at_set_req_timeout(req);
3034
3035         if (flags & OBD_STATFS_NODELAY) {
3036                 /* procfs requests not want stat in wait for avoid deadlock */
3037                 req->rq_no_resend = 1;
3038                 req->rq_no_delay = 1;
3039         }
3040
3041         rc = ptlrpc_queue_wait(req);
3042         if (rc)
3043                 GOTO(out, rc);
3044
3045         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3046         if (msfs == NULL)
3047                 GOTO(out, rc = -EPROTO);
3048
3049         *osfs = *msfs;
3050
3051         EXIT;
3052 out:
3053         ptlrpc_req_finished(req);
3054         return rc;
3055 }
3056
3057 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3058                          void *karg, void __user *uarg)
3059 {
3060         struct obd_device *obd = exp->exp_obd;
3061         struct obd_ioctl_data *data = karg;
3062         int rc = 0;
3063
3064         ENTRY;
3065         if (!try_module_get(THIS_MODULE)) {
3066                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3067                        module_name(THIS_MODULE));
3068                 return -EINVAL;
3069         }
3070         switch (cmd) {
3071         case OBD_IOC_CLIENT_RECOVER:
3072                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3073                                            data->ioc_inlbuf1, 0);
3074                 if (rc > 0)
3075                         rc = 0;
3076                 break;
3077         case IOC_OSC_SET_ACTIVE:
3078                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3079                                               data->ioc_offset);
3080                 break;
3081         default:
3082                 rc = -ENOTTY;
3083                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3084                        obd->obd_name, cmd, current->comm, rc);
3085                 break;
3086         }
3087
3088         module_put(THIS_MODULE);
3089         return rc;
3090 }
3091
3092 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3093                        u32 keylen, void *key, u32 vallen, void *val,
3094                        struct ptlrpc_request_set *set)
3095 {
3096         struct ptlrpc_request *req;
3097         struct obd_device     *obd = exp->exp_obd;
3098         struct obd_import     *imp = class_exp2cliimp(exp);
3099         char                  *tmp;
3100         int                    rc;
3101         ENTRY;
3102
3103         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3104
3105         if (KEY_IS(KEY_CHECKSUM)) {
3106                 if (vallen != sizeof(int))
3107                         RETURN(-EINVAL);
3108                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3109                 RETURN(0);
3110         }
3111
3112         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3113                 sptlrpc_conf_client_adapt(obd);
3114                 RETURN(0);
3115         }
3116
3117         if (KEY_IS(KEY_FLUSH_CTX)) {
3118                 sptlrpc_import_flush_my_ctx(imp);
3119                 RETURN(0);
3120         }
3121
3122         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3123                 struct client_obd *cli = &obd->u.cli;
3124                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3125                 long target = *(long *)val;
3126
3127                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3128                 *(long *)val -= nr;
3129                 RETURN(0);
3130         }
3131
3132         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3133                 RETURN(-EINVAL);
3134
3135         /* We pass all other commands directly to OST. Since nobody calls osc
3136            methods directly and everybody is supposed to go through LOV, we
3137            assume lov checked invalid values for us.
3138            The only recognised values so far are evict_by_nid and mds_conn.
3139            Even if something bad goes through, we'd get a -EINVAL from OST
3140            anyway. */
3141
3142         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3143                                                 &RQF_OST_SET_GRANT_INFO :
3144                                                 &RQF_OBD_SET_INFO);
3145         if (req == NULL)
3146                 RETURN(-ENOMEM);
3147
3148         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3149                              RCL_CLIENT, keylen);
3150         if (!KEY_IS(KEY_GRANT_SHRINK))
3151                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3152                                      RCL_CLIENT, vallen);
3153         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3154         if (rc) {
3155                 ptlrpc_request_free(req);
3156                 RETURN(rc);
3157         }
3158
3159         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3160         memcpy(tmp, key, keylen);
3161         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3162                                                         &RMF_OST_BODY :
3163                                                         &RMF_SETINFO_VAL);
3164         memcpy(tmp, val, vallen);
3165
3166         if (KEY_IS(KEY_GRANT_SHRINK)) {
3167                 struct osc_grant_args *aa;
3168                 struct obdo *oa;
3169
3170                 aa = ptlrpc_req_async_args(aa, req);
3171                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3172                 if (!oa) {
3173                         ptlrpc_req_finished(req);
3174                         RETURN(-ENOMEM);
3175                 }
3176                 *oa = ((struct ost_body *)val)->oa;
3177                 aa->aa_oa = oa;
3178                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3179         }
3180
3181         ptlrpc_request_set_replen(req);
3182         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3183                 LASSERT(set != NULL);
3184                 ptlrpc_set_add_req(set, req);
3185                 ptlrpc_check_set(NULL, set);
3186         } else {
3187                 ptlrpcd_add_req(req);
3188         }
3189
3190         RETURN(0);
3191 }
3192 EXPORT_SYMBOL(osc_set_info_async);
3193
3194 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3195                   struct obd_device *obd, struct obd_uuid *cluuid,
3196                   struct obd_connect_data *data, void *localdata)
3197 {
3198         struct client_obd *cli = &obd->u.cli;
3199
3200         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3201                 long lost_grant;
3202                 long grant;
3203
3204                 spin_lock(&cli->cl_loi_list_lock);
3205                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3206                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3207                         /* restore ocd_grant_blkbits as client page bits */
3208                         data->ocd_grant_blkbits = PAGE_SHIFT;
3209                         grant += cli->cl_dirty_grant;
3210                 } else {
3211                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3212                 }
3213                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3214                 lost_grant = cli->cl_lost_grant;
3215                 cli->cl_lost_grant = 0;
3216                 spin_unlock(&cli->cl_loi_list_lock);
3217
3218                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3219                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3220                        data->ocd_version, data->ocd_grant, lost_grant);
3221         }
3222
3223         RETURN(0);
3224 }
3225 EXPORT_SYMBOL(osc_reconnect);
3226
3227 int osc_disconnect(struct obd_export *exp)
3228 {
3229         struct obd_device *obd = class_exp2obd(exp);
3230         int rc;
3231
3232         rc = client_disconnect_export(exp);
3233         /**
3234          * Initially we put del_shrink_grant before disconnect_export, but it
3235          * causes the following problem if setup (connect) and cleanup
3236          * (disconnect) are tangled together.
3237          *      connect p1                     disconnect p2
3238          *   ptlrpc_connect_import
3239          *     ...............               class_manual_cleanup
3240          *                                     osc_disconnect
3241          *                                     del_shrink_grant
3242          *   ptlrpc_connect_interrupt
3243          *     osc_init_grant
3244          *   add this client to shrink list
3245          *                                      cleanup_osc
3246          * Bang! grant shrink thread trigger the shrink. BUG18662
3247          */
3248         osc_del_grant_list(&obd->u.cli);
3249         return rc;
3250 }
3251 EXPORT_SYMBOL(osc_disconnect);
3252
3253 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3254                                  struct hlist_node *hnode, void *arg)
3255 {
3256         struct lu_env *env = arg;
3257         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3258         struct ldlm_lock *lock;
3259         struct osc_object *osc = NULL;
3260         ENTRY;
3261
3262         lock_res(res);
3263         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3264                 if (lock->l_ast_data != NULL && osc == NULL) {
3265                         osc = lock->l_ast_data;
3266                         cl_object_get(osc2cl(osc));
3267                 }
3268
3269                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3270                  * by the 2nd round of ldlm_namespace_clean() call in
3271                  * osc_import_event(). */
3272                 ldlm_clear_cleaned(lock);
3273         }
3274         unlock_res(res);
3275
3276         if (osc != NULL) {
3277                 osc_object_invalidate(env, osc);
3278                 cl_object_put(env, osc2cl(osc));
3279         }
3280
3281         RETURN(0);
3282 }
3283 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3284
3285 static int osc_import_event(struct obd_device *obd,
3286                             struct obd_import *imp,
3287                             enum obd_import_event event)
3288 {
3289         struct client_obd *cli;
3290         int rc = 0;
3291
3292         ENTRY;
3293         LASSERT(imp->imp_obd == obd);
3294
3295         switch (event) {
3296         case IMP_EVENT_DISCON: {
3297                 cli = &obd->u.cli;
3298                 spin_lock(&cli->cl_loi_list_lock);
3299                 cli->cl_avail_grant = 0;
3300                 cli->cl_lost_grant = 0;
3301                 spin_unlock(&cli->cl_loi_list_lock);
3302                 break;
3303         }
3304         case IMP_EVENT_INACTIVE: {
3305                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3306                 break;
3307         }
3308         case IMP_EVENT_INVALIDATE: {
3309                 struct ldlm_namespace *ns = obd->obd_namespace;
3310                 struct lu_env         *env;
3311                 __u16                  refcheck;
3312
3313                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3314
3315                 env = cl_env_get(&refcheck);
3316                 if (!IS_ERR(env)) {
3317                         osc_io_unplug(env, &obd->u.cli, NULL);
3318
3319                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3320                                                  osc_ldlm_resource_invalidate,
3321                                                  env, 0);
3322                         cl_env_put(env, &refcheck);
3323
3324                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3325                 } else
3326                         rc = PTR_ERR(env);
3327                 break;
3328         }
3329         case IMP_EVENT_ACTIVE: {
3330                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3331                 break;
3332         }
3333         case IMP_EVENT_OCD: {
3334                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3335
3336                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3337                         osc_init_grant(&obd->u.cli, ocd);
3338
3339                 /* See bug 7198 */
3340                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3341                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3342
3343                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3344                 break;
3345         }
3346         case IMP_EVENT_DEACTIVATE: {
3347                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3348                 break;
3349         }
3350         case IMP_EVENT_ACTIVATE: {
3351                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3352                 break;
3353         }
3354         default:
3355                 CERROR("Unknown import event %d\n", event);
3356                 LBUG();
3357         }
3358         RETURN(rc);
3359 }
3360
3361 /**
3362  * Determine whether the lock can be canceled before replaying the lock
3363  * during recovery, see bug16774 for detailed information.
3364  *
3365  * \retval zero the lock can't be canceled
3366  * \retval other ok to cancel
3367  */
3368 static int osc_cancel_weight(struct ldlm_lock *lock)
3369 {
3370         /*
3371          * Cancel all unused and granted extent lock.
3372          */
3373         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3374             ldlm_is_granted(lock) &&
3375             osc_ldlm_weigh_ast(lock) == 0)
3376                 RETURN(1);
3377
3378         RETURN(0);
3379 }
3380
3381 static int brw_queue_work(const struct lu_env *env, void *data)
3382 {
3383         struct client_obd *cli = data;
3384
3385         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3386
3387         osc_io_unplug(env, cli, NULL);
3388         RETURN(0);
3389 }
3390
3391 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3392 {
3393         struct client_obd *cli = &obd->u.cli;
3394         void *handler;
3395         int rc;
3396
3397         ENTRY;
3398
3399         rc = ptlrpcd_addref();
3400         if (rc)
3401                 RETURN(rc);
3402
3403         rc = client_obd_setup(obd, lcfg);
3404         if (rc)
3405                 GOTO(out_ptlrpcd, rc);
3406
3407
3408         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3409         if (IS_ERR(handler))
3410                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3411         cli->cl_writeback_work = handler;
3412
3413         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3414         if (IS_ERR(handler))
3415                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3416         cli->cl_lru_work = handler;
3417
3418         rc = osc_quota_setup(obd);
3419         if (rc)
3420                 GOTO(out_ptlrpcd_work, rc);
3421
3422         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3423         osc_update_next_shrink(cli);
3424
3425         RETURN(rc);
3426
3427 out_ptlrpcd_work:
3428         if (cli->cl_writeback_work != NULL) {
3429                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3430                 cli->cl_writeback_work = NULL;
3431         }
3432         if (cli->cl_lru_work != NULL) {
3433                 ptlrpcd_destroy_work(cli->cl_lru_work);
3434                 cli->cl_lru_work = NULL;
3435         }
3436         client_obd_cleanup(obd);
3437 out_ptlrpcd:
3438         ptlrpcd_decref();
3439         RETURN(rc);
3440 }
3441 EXPORT_SYMBOL(osc_setup_common);
3442
3443 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3444 {
3445         struct client_obd *cli = &obd->u.cli;
3446         int                adding;
3447         int                added;
3448         int                req_count;
3449         int                rc;
3450
3451         ENTRY;
3452
3453         rc = osc_setup_common(obd, lcfg);
3454         if (rc < 0)
3455                 RETURN(rc);
3456
3457         rc = osc_tunables_init(obd);
3458         if (rc)
3459                 RETURN(rc);
3460
3461         /*
3462          * We try to control the total number of requests with a upper limit
3463          * osc_reqpool_maxreqcount. There might be some race which will cause
3464          * over-limit allocation, but it is fine.
3465          */
3466         req_count = atomic_read(&osc_pool_req_count);
3467         if (req_count < osc_reqpool_maxreqcount) {
3468                 adding = cli->cl_max_rpcs_in_flight + 2;
3469                 if (req_count + adding > osc_reqpool_maxreqcount)
3470                         adding = osc_reqpool_maxreqcount - req_count;
3471
3472                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3473                 atomic_add(added, &osc_pool_req_count);
3474         }
3475
3476         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3477
3478         spin_lock(&osc_shrink_lock);
3479         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3480         spin_unlock(&osc_shrink_lock);
3481         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3482         cli->cl_import->imp_idle_debug = D_HA;
3483
3484         RETURN(0);
3485 }
3486
3487 int osc_precleanup_common(struct obd_device *obd)
3488 {
3489         struct client_obd *cli = &obd->u.cli;
3490         ENTRY;
3491
3492         /* LU-464
3493          * for echo client, export may be on zombie list, wait for
3494          * zombie thread to cull it, because cli.cl_import will be
3495          * cleared in client_disconnect_export():
3496          *   class_export_destroy() -> obd_cleanup() ->
3497          *   echo_device_free() -> echo_client_cleanup() ->
3498          *   obd_disconnect() -> osc_disconnect() ->
3499          *   client_disconnect_export()
3500          */
3501         obd_zombie_barrier();
3502         if (cli->cl_writeback_work) {
3503                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3504                 cli->cl_writeback_work = NULL;
3505         }
3506
3507         if (cli->cl_lru_work) {
3508                 ptlrpcd_destroy_work(cli->cl_lru_work);
3509                 cli->cl_lru_work = NULL;
3510         }
3511
3512         obd_cleanup_client_import(obd);
3513         RETURN(0);
3514 }
3515 EXPORT_SYMBOL(osc_precleanup_common);
3516
3517 static int osc_precleanup(struct obd_device *obd)
3518 {
3519         ENTRY;
3520
3521         osc_precleanup_common(obd);
3522
3523         ptlrpc_lprocfs_unregister_obd(obd);
3524         RETURN(0);
3525 }
3526
3527 int osc_cleanup_common(struct obd_device *obd)
3528 {
3529         struct client_obd *cli = &obd->u.cli;
3530         int rc;
3531
3532         ENTRY;
3533
3534         spin_lock(&osc_shrink_lock);
3535         list_del(&cli->cl_shrink_list);
3536         spin_unlock(&osc_shrink_lock);
3537
3538         /* lru cleanup */
3539         if (cli->cl_cache != NULL) {
3540                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3541                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3542                 list_del_init(&cli->cl_lru_osc);
3543                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3544                 cli->cl_lru_left = NULL;
3545                 cl_cache_decref(cli->cl_cache);
3546                 cli->cl_cache = NULL;
3547         }
3548
3549         /* free memory of osc quota cache */
3550         osc_quota_cleanup(obd);
3551
3552         rc = client_obd_cleanup(obd);
3553
3554         ptlrpcd_decref();
3555         RETURN(rc);
3556 }
3557 EXPORT_SYMBOL(osc_cleanup_common);
3558
3559 static const struct obd_ops osc_obd_ops = {
3560         .o_owner                = THIS_MODULE,
3561         .o_setup                = osc_setup,
3562         .o_precleanup           = osc_precleanup,
3563         .o_cleanup              = osc_cleanup_common,
3564         .o_add_conn             = client_import_add_conn,
3565         .o_del_conn             = client_import_del_conn,
3566         .o_connect              = client_connect_import,
3567         .o_reconnect            = osc_reconnect,
3568         .o_disconnect           = osc_disconnect,
3569         .o_statfs               = osc_statfs,
3570         .o_statfs_async         = osc_statfs_async,
3571         .o_create               = osc_create,
3572         .o_destroy              = osc_destroy,
3573         .o_getattr              = osc_getattr,
3574         .o_setattr              = osc_setattr,
3575         .o_iocontrol            = osc_iocontrol,
3576         .o_set_info_async       = osc_set_info_async,
3577         .o_import_event         = osc_import_event,
3578         .o_quotactl             = osc_quotactl,
3579 };
3580
3581 static struct shrinker *osc_cache_shrinker;
3582 LIST_HEAD(osc_shrink_list);
3583 DEFINE_SPINLOCK(osc_shrink_lock);
3584
3585 #ifndef HAVE_SHRINKER_COUNT
3586 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3587 {
3588         struct shrink_control scv = {
3589                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3590                 .gfp_mask   = shrink_param(sc, gfp_mask)
3591         };
3592         (void)osc_cache_shrink_scan(shrinker, &scv);
3593
3594         return osc_cache_shrink_count(shrinker, &scv);
3595 }
3596 #endif
3597
3598 static int __init osc_init(void)
3599 {
3600         unsigned int reqpool_size;
3601         unsigned int reqsize;
3602         int rc;
3603         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3604                          osc_cache_shrink_count, osc_cache_shrink_scan);
3605         ENTRY;
3606
3607         /* print an address of _any_ initialized kernel symbol from this
3608          * module, to allow debugging with gdb that doesn't support data
3609          * symbols from modules.*/
3610         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3611
3612         rc = lu_kmem_init(osc_caches);
3613         if (rc)
3614                 RETURN(rc);
3615
3616         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3617                                  LUSTRE_OSC_NAME, &osc_device_type);
3618         if (rc)
3619                 GOTO(out_kmem, rc);
3620
3621         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3622
3623         /* This is obviously too much memory, only prevent overflow here */
3624         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3625                 GOTO(out_type, rc = -EINVAL);
3626
3627         reqpool_size = osc_reqpool_mem_max << 20;
3628
3629         reqsize = 1;
3630         while (reqsize < OST_IO_MAXREQSIZE)
3631                 reqsize = reqsize << 1;
3632
3633         /*
3634          * We don't enlarge the request count in OSC pool according to
3635          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3636          * tried after normal allocation failed. So a small OSC pool won't
3637          * cause much performance degression in most of cases.
3638          */
3639         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3640
3641         atomic_set(&osc_pool_req_count, 0);
3642         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3643                                           ptlrpc_add_rqs_to_pool);
3644
3645         if (osc_rq_pool == NULL)
3646                 GOTO(out_type, rc = -ENOMEM);
3647
3648         rc = osc_start_grant_work();
3649         if (rc != 0)
3650                 GOTO(out_req_pool, rc);
3651
3652         RETURN(rc);
3653
3654 out_req_pool:
3655         ptlrpc_free_rq_pool(osc_rq_pool);
3656 out_type:
3657         class_unregister_type(LUSTRE_OSC_NAME);
3658 out_kmem:
3659         lu_kmem_fini(osc_caches);
3660
3661         RETURN(rc);
3662 }
3663
3664 static void __exit osc_exit(void)
3665 {
3666         osc_stop_grant_work();
3667         remove_shrinker(osc_cache_shrinker);
3668         class_unregister_type(LUSTRE_OSC_NAME);
3669         lu_kmem_fini(osc_caches);
3670         ptlrpc_free_rq_pool(osc_rq_pool);
3671 }
3672
3673 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3674 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3675 MODULE_VERSION(LUSTRE_VERSION_STRING);
3676 MODULE_LICENSE("GPL");
3677
3678 module_init(osc_init);
3679 module_exit(osc_exit);