Whamcloud - gitweb
LU-15608 sec: fix DIO for encrypted files
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
49
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(osc_fallocate_base);
483
484 static int osc_sync_interpret(const struct lu_env *env,
485                               struct ptlrpc_request *req, void *args, int rc)
486 {
487         struct osc_fsync_args *fa = args;
488         struct ost_body *body;
489         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490         unsigned long valid = 0;
491         struct cl_object *obj;
492         ENTRY;
493
494         if (rc != 0)
495                 GOTO(out, rc);
496
497         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498         if (body == NULL) {
499                 CERROR("can't unpack ost_body\n");
500                 GOTO(out, rc = -EPROTO);
501         }
502
503         *fa->fa_oa = body->oa;
504         obj = osc2cl(fa->fa_obj);
505
506         /* Update osc object's blocks attribute */
507         cl_object_attr_lock(obj);
508         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509                 attr->cat_blocks = body->oa.o_blocks;
510                 valid |= CAT_BLOCKS;
511         }
512
513         if (valid != 0)
514                 cl_object_attr_update(env, obj, attr, valid);
515         cl_object_attr_unlock(obj);
516
517 out:
518         rc = fa->fa_upcall(fa->fa_cookie, rc);
519         RETURN(rc);
520 }
521
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523                   obd_enqueue_update_f upcall, void *cookie,
524                   struct ptlrpc_request_set *rqset)
525 {
526         struct obd_export     *exp = osc_export(obj);
527         struct ptlrpc_request *req;
528         struct ost_body       *body;
529         struct osc_fsync_args *fa;
530         int                    rc;
531         ENTRY;
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549         req->rq_interpret_reply = osc_sync_interpret;
550
551         fa = ptlrpc_req_async_args(fa, req);
552         fa->fa_obj = obj;
553         fa->fa_oa = oa;
554         fa->fa_upcall = upcall;
555         fa->fa_cookie = cookie;
556
557         ptlrpc_set_add_req(rqset, req);
558
559         RETURN (0);
560 }
561
562 /* Find and cancel locally locks matched by @mode in the resource found by
563  * @objid. Found locks are added into @cancel list. Returns the amount of
564  * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566                                    struct list_head *cancels,
567                                    enum ldlm_mode mode, __u64 lock_flags)
568 {
569         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570         struct ldlm_res_id res_id;
571         struct ldlm_resource *res;
572         int count;
573         ENTRY;
574
575         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576          * export) but disabled through procfs (flag in NS).
577          *
578          * This distinguishes from a case when ELC is not supported originally,
579          * when we still want to cancel locks in advance and just cancel them
580          * locally, without sending any RPC. */
581         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
582                 RETURN(0);
583
584         ostid_build_res_name(&oa->o_oi, &res_id);
585         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
586         if (IS_ERR(res))
587                 RETURN(0);
588
589         LDLM_RESOURCE_ADDREF(res);
590         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591                                            lock_flags, 0, NULL);
592         LDLM_RESOURCE_DELREF(res);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(const struct lu_env *env,
598                                  struct ptlrpc_request *req, void *args, int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         wake_up(&cli->cl_destroy_waitq);
604
605         return 0;
606 }
607
608 static int osc_can_send_destroy(struct client_obd *cli)
609 {
610         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611             cli->cl_max_rpcs_in_flight) {
612                 /* The destroy request can be sent */
613                 return 1;
614         }
615         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616             cli->cl_max_rpcs_in_flight) {
617                 /*
618                  * The counter has been modified between the two atomic
619                  * operations.
620                  */
621                 wake_up(&cli->cl_destroy_waitq);
622         }
623         return 0;
624 }
625
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
627                        struct obdo *oa)
628 {
629         struct client_obd     *cli = &exp->exp_obd->u.cli;
630         struct ptlrpc_request *req;
631         struct ost_body       *body;
632         LIST_HEAD(cancels);
633         int rc, count;
634         ENTRY;
635
636         if (!oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642                                         LDLM_FL_DISCARD_DATA);
643
644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645         if (req == NULL) {
646                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
647                 RETURN(-ENOMEM);
648         }
649
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661         LASSERT(body);
662         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663
664         ptlrpc_request_set_replen(req);
665
666         req->rq_interpret_reply = osc_destroy_interpret;
667         if (!osc_can_send_destroy(cli)) {
668                 /*
669                  * Wait until the number of on-going destroy RPCs drops
670                  * under max_rpc_in_flight
671                  */
672                 rc = l_wait_event_abortable_exclusive(
673                         cli->cl_destroy_waitq,
674                         osc_can_send_destroy(cli));
675                 if (rc) {
676                         ptlrpc_req_finished(req);
677                         RETURN(-EINTR);
678                 }
679         }
680
681         /* Do not wait for response */
682         ptlrpcd_add_req(req);
683         RETURN(0);
684 }
685
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
687                                 long writing_bytes)
688 {
689         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690
691         LASSERT(!(oa->o_valid & bits));
692
693         oa->o_valid |= bits;
694         spin_lock(&cli->cl_loi_list_lock);
695         if (cli->cl_ocd_grant_param)
696                 oa->o_dirty = cli->cl_dirty_grant;
697         else
698                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700                 CERROR("dirty %lu > dirty_max %lu\n",
701                        cli->cl_dirty_pages,
702                        cli->cl_dirty_max_pages);
703                 oa->o_undirty = 0;
704         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705                             (long)(obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
711                        obd_max_dirty_pages);
712                 oa->o_undirty = 0;
713         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714                             0x7fffffff)) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
717                 oa->o_undirty = 0;
718         } else {
719                 unsigned long nrpages;
720                 unsigned long undirty;
721
722                 nrpages = cli->cl_max_pages_per_rpc;
723                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725                 undirty = nrpages << PAGE_SHIFT;
726                 if (cli->cl_ocd_grant_param) {
727                         int nrextents;
728
729                         /* take extent tax into account when asking for more
730                          * grant space */
731                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732                                      cli->cl_max_extent_pages;
733                         undirty += nrextents * cli->cl_grant_extent_tax;
734                 }
735                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736                  * to add extent tax, etc.
737                  */
738                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740         }
741         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743         if (cli->cl_lost_grant > INT_MAX) {
744                 CDEBUG(D_CACHE,
745                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746                       cli_name(cli), cli->cl_lost_grant);
747                 oa->o_dropped = INT_MAX;
748         } else {
749                 oa->o_dropped = cli->cl_lost_grant;
750         }
751         cli->cl_lost_grant -= oa->o_dropped;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
756 }
757
758 void osc_update_next_shrink(struct client_obd *cli)
759 {
760         cli->cl_next_shrink_grant = ktime_get_seconds() +
761                                     cli->cl_grant_shrink_interval;
762
763         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764                cli->cl_next_shrink_grant);
765 }
766 EXPORT_SYMBOL(osc_update_next_shrink);
767
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
769 {
770         spin_lock(&cli->cl_loi_list_lock);
771         cli->cl_avail_grant += grant;
772         spin_unlock(&cli->cl_loi_list_lock);
773 }
774
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
776 {
777         if (body->oa.o_valid & OBD_MD_FLGRANT) {
778                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779                 __osc_update_grant(cli, body->oa.o_grant);
780         }
781 }
782
783 /**
784  * grant thread data for shrinking space.
785  */
786 struct grant_thread_data {
787         struct list_head        gtd_clients;
788         struct mutex            gtd_mutex;
789         unsigned long           gtd_stopped:1;
790 };
791 static struct grant_thread_data client_gtd;
792
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794                                       struct ptlrpc_request *req,
795                                       void *args, int rc)
796 {
797         struct osc_grant_args *aa = args;
798         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799         struct ost_body *body;
800
801         if (rc != 0) {
802                 __osc_update_grant(cli, aa->aa_oa->o_grant);
803                 GOTO(out, rc);
804         }
805
806         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
807         LASSERT(body);
808         osc_update_grant(cli, body);
809 out:
810         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
811         aa->aa_oa = NULL;
812
813         return rc;
814 }
815
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
817 {
818         spin_lock(&cli->cl_loi_list_lock);
819         oa->o_grant = cli->cl_avail_grant / 4;
820         cli->cl_avail_grant -= oa->o_grant;
821         spin_unlock(&cli->cl_loi_list_lock);
822         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823                 oa->o_valid |= OBD_MD_FLFLAGS;
824                 oa->o_flags = 0;
825         }
826         oa->o_flags |= OBD_FL_SHRINK_GRANT;
827         osc_update_next_shrink(cli);
828 }
829
830 /* Shrink the current grant, either from some large amount to enough for a
831  * full set of in-flight RPCs, or if we have already shrunk to that limit
832  * then to enough for a single RPC.  This avoids keeping more grant than
833  * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
835 {
836         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
838
839         spin_lock(&cli->cl_loi_list_lock);
840         if (cli->cl_avail_grant <= target_bytes)
841                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842         spin_unlock(&cli->cl_loi_list_lock);
843
844         return osc_shrink_grant_to_target(cli, target_bytes);
845 }
846
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
848 {
849         int                     rc = 0;
850         struct ost_body        *body;
851         ENTRY;
852
853         spin_lock(&cli->cl_loi_list_lock);
854         /* Don't shrink if we are already above or below the desired limit
855          * We don't want to shrink below a single RPC, as that will negatively
856          * impact block allocation and long-term performance. */
857         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860         if (target_bytes >= cli->cl_avail_grant) {
861                 spin_unlock(&cli->cl_loi_list_lock);
862                 RETURN(0);
863         }
864         spin_unlock(&cli->cl_loi_list_lock);
865
866         OBD_ALLOC_PTR(body);
867         if (!body)
868                 RETURN(-ENOMEM);
869
870         osc_announce_cached(cli, &body->oa, 0);
871
872         spin_lock(&cli->cl_loi_list_lock);
873         if (target_bytes >= cli->cl_avail_grant) {
874                 /* available grant has changed since target calculation */
875                 spin_unlock(&cli->cl_loi_list_lock);
876                 GOTO(out_free, rc = 0);
877         }
878         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879         cli->cl_avail_grant = target_bytes;
880         spin_unlock(&cli->cl_loi_list_lock);
881         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882                 body->oa.o_valid |= OBD_MD_FLFLAGS;
883                 body->oa.o_flags = 0;
884         }
885         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887
888         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890                                 sizeof(*body), body, NULL);
891         if (rc != 0)
892                 __osc_update_grant(cli, body->oa.o_grant);
893 out_free:
894         OBD_FREE_PTR(body);
895         RETURN(rc);
896 }
897
898 static int osc_should_shrink_grant(struct client_obd *client)
899 {
900         time64_t next_shrink = client->cl_next_shrink_grant;
901
902         if (client->cl_import == NULL)
903                 return 0;
904
905         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906             client->cl_import->imp_grant_shrink_disabled) {
907                 osc_update_next_shrink(client);
908                 return 0;
909         }
910
911         if (ktime_get_seconds() >= next_shrink - 5) {
912                 /* Get the current RPC size directly, instead of going via:
913                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914                  * Keep comment here so that it can be found by searching. */
915                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
916
917                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918                     client->cl_avail_grant > brw_size)
919                         return 1;
920                 else
921                         osc_update_next_shrink(client);
922         }
923         return 0;
924 }
925
926 #define GRANT_SHRINK_RPC_BATCH  100
927
928 static struct delayed_work work;
929
930 static void osc_grant_work_handler(struct work_struct *data)
931 {
932         struct client_obd *cli;
933         int rpc_sent;
934         bool init_next_shrink = true;
935         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
936
937         rpc_sent = 0;
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_for_each_entry(cli, &client_gtd.gtd_clients,
940                             cl_grant_chain) {
941                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942                     osc_should_shrink_grant(cli)) {
943                         osc_shrink_grant(cli);
944                         rpc_sent++;
945                 }
946
947                 if (!init_next_shrink) {
948                         if (cli->cl_next_shrink_grant < next_shrink &&
949                             cli->cl_next_shrink_grant > ktime_get_seconds())
950                                 next_shrink = cli->cl_next_shrink_grant;
951                 } else {
952                         init_next_shrink = false;
953                         next_shrink = cli->cl_next_shrink_grant;
954                 }
955         }
956         mutex_unlock(&client_gtd.gtd_mutex);
957
958         if (client_gtd.gtd_stopped == 1)
959                 return;
960
961         if (next_shrink > ktime_get_seconds()) {
962                 time64_t delay = next_shrink - ktime_get_seconds();
963
964                 schedule_delayed_work(&work, cfs_time_seconds(delay));
965         } else {
966                 schedule_work(&work.work);
967         }
968 }
969
970 void osc_schedule_grant_work(void)
971 {
972         cancel_delayed_work_sync(&work);
973         schedule_work(&work.work);
974 }
975 EXPORT_SYMBOL(osc_schedule_grant_work);
976
977 /**
978  * Start grant thread for returing grant to server for idle clients.
979  */
980 static int osc_start_grant_work(void)
981 {
982         client_gtd.gtd_stopped = 0;
983         mutex_init(&client_gtd.gtd_mutex);
984         INIT_LIST_HEAD(&client_gtd.gtd_clients);
985
986         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987         schedule_work(&work.work);
988
989         return 0;
990 }
991
992 static void osc_stop_grant_work(void)
993 {
994         client_gtd.gtd_stopped = 1;
995         cancel_delayed_work_sync(&work);
996 }
997
998 static void osc_add_grant_list(struct client_obd *client)
999 {
1000         mutex_lock(&client_gtd.gtd_mutex);
1001         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002         mutex_unlock(&client_gtd.gtd_mutex);
1003 }
1004
1005 static void osc_del_grant_list(struct client_obd *client)
1006 {
1007         if (list_empty(&client->cl_grant_chain))
1008                 return;
1009
1010         mutex_lock(&client_gtd.gtd_mutex);
1011         list_del_init(&client->cl_grant_chain);
1012         mutex_unlock(&client_gtd.gtd_mutex);
1013 }
1014
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1016 {
1017         /*
1018          * ocd_grant is the total grant amount we're expect to hold: if we've
1019          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1021          * dirty.
1022          *
1023          * race is tolerable here: if we're evicted, but imp_state already
1024          * left EVICTED state, then cl_dirty_pages must be 0 already.
1025          */
1026         spin_lock(&cli->cl_loi_list_lock);
1027         cli->cl_avail_grant = ocd->ocd_grant;
1028         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029                 unsigned long consumed = cli->cl_reserved_grant;
1030
1031                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032                         consumed += cli->cl_dirty_grant;
1033                 else
1034                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035                 if (cli->cl_avail_grant < consumed) {
1036                         CERROR("%s: granted %ld but already consumed %ld\n",
1037                                cli_name(cli), cli->cl_avail_grant, consumed);
1038                         cli->cl_avail_grant = 0;
1039                 } else {
1040                         cli->cl_avail_grant -= consumed;
1041                 }
1042         }
1043
1044         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1045                 u64 size;
1046                 int chunk_mask;
1047
1048                 /* overhead for each extent insertion */
1049                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050                 /* determine the appropriate chunk size used by osc_extent. */
1051                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052                                           ocd->ocd_grant_blkbits);
1053                 /* max_pages_per_rpc must be chunk aligned */
1054                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056                                              ~chunk_mask) & chunk_mask;
1057                 /* determine maximum extent size, in #pages */
1058                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060                 cli->cl_ocd_grant_param = 1;
1061         } else {
1062                 cli->cl_ocd_grant_param = 0;
1063                 cli->cl_grant_extent_tax = 0;
1064                 cli->cl_chunkbits = PAGE_SHIFT;
1065                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1066         }
1067         spin_unlock(&cli->cl_loi_list_lock);
1068
1069         CDEBUG(D_CACHE,
1070                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1071                cli_name(cli),
1072                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073                cli->cl_max_extent_pages);
1074
1075         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076                 osc_add_grant_list(cli);
1077 }
1078 EXPORT_SYMBOL(osc_init_grant);
1079
1080 /* We assume that the reason this OSC got a short read is because it read
1081  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083  * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085                               struct brw_page **pga)
1086 {
1087         char *ptr;
1088         int i = 0;
1089
1090         /* skip bytes read OK */
1091         while (nob_read > 0) {
1092                 LASSERT (page_count > 0);
1093
1094                 if (pga[i]->count > nob_read) {
1095                         /* EOF inside this page */
1096                         ptr = kmap(pga[i]->pg) +
1097                                 (pga[i]->off & ~PAGE_MASK);
1098                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1099                         kunmap(pga[i]->pg);
1100                         page_count--;
1101                         i++;
1102                         break;
1103                 }
1104
1105                 nob_read -= pga[i]->count;
1106                 page_count--;
1107                 i++;
1108         }
1109
1110         /* zero remaining pages */
1111         while (page_count-- > 0) {
1112                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1113                 memset(ptr, 0, pga[i]->count);
1114                 kunmap(pga[i]->pg);
1115                 i++;
1116         }
1117 }
1118
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120                            int requested_nob, int niocount,
1121                            size_t page_count, struct brw_page **pga)
1122 {
1123         int     i;
1124         __u32   *remote_rcs;
1125
1126         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127                                                   sizeof(*remote_rcs) *
1128                                                   niocount);
1129         if (remote_rcs == NULL) {
1130                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1131                 return(-EPROTO);
1132         }
1133
1134         /* return error if any niobuf was in error */
1135         for (i = 0; i < niocount; i++) {
1136                 if ((int)remote_rcs[i] < 0) {
1137                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138                                i, remote_rcs[i], req);
1139                         return remote_rcs[i];
1140                 }
1141
1142                 if (remote_rcs[i] != 0) {
1143                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144                                 i, remote_rcs[i], req);
1145                         return(-EPROTO);
1146                 }
1147         }
1148         if (req->rq_bulk != NULL &&
1149             req->rq_bulk->bd_nob_transferred != requested_nob) {
1150                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151                        req->rq_bulk->bd_nob_transferred, requested_nob);
1152                 return(-EPROTO);
1153         }
1154
1155         return (0);
1156 }
1157
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1159 {
1160         if (p1->flag != p2->flag) {
1161                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1163                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC |
1164                                   OBD_BRW_SYS_RESOURCE);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at https://jira.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181                                    size_t pg_count, struct brw_page **pga,
1182                                    int opc, obd_dif_csum_fn *fn,
1183                                    int sector_size,
1184                                    u32 *check_sum, bool resend)
1185 {
1186         struct ahash_request *req;
1187         /* Used Adler as the default checksum type on top of DIF tags */
1188         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189         struct page *__page;
1190         unsigned char *buffer;
1191         __u16 *guard_start;
1192         unsigned int bufsize;
1193         int guard_number;
1194         int used_number = 0;
1195         int used;
1196         u32 cksum;
1197         int rc = 0;
1198         int i = 0;
1199
1200         LASSERT(pg_count > 0);
1201
1202         __page = alloc_page(GFP_KERNEL);
1203         if (__page == NULL)
1204                 return -ENOMEM;
1205
1206         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207         if (IS_ERR(req)) {
1208                 rc = PTR_ERR(req);
1209                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1211                 GOTO(out, rc);
1212         }
1213
1214         buffer = kmap(__page);
1215         guard_start = (__u16 *)buffer;
1216         guard_number = PAGE_SIZE / sizeof(*guard_start);
1217         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218                "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219                guard_number, resend, nob, pg_count);
1220
1221         while (nob > 0 && pg_count > 0) {
1222                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1223
1224                 /* corrupt the data before we compute the checksum, to
1225                  * simulate an OST->client data error */
1226                 if (unlikely(i == 0 && opc == OST_READ &&
1227                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1228                         unsigned char *ptr = kmap(pga[i]->pg);
1229                         int off = pga[i]->off & ~PAGE_MASK;
1230
1231                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1232                         kunmap(pga[i]->pg);
1233                 }
1234
1235                 /*
1236                  * The left guard number should be able to hold checksums of a
1237                  * whole page
1238                  */
1239                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1240                                                   pga[i]->off & ~PAGE_MASK,
1241                                                   count,
1242                                                   guard_start + used_number,
1243                                                   guard_number - used_number,
1244                                                   &used, sector_size,
1245                                                   fn);
1246                 if (unlikely(resend))
1247                         CDEBUG(D_PAGE | D_HA,
1248                                "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1249                                i, used, pga[i]->off & ~PAGE_MASK, count,
1250                                (int)(used * sizeof(*guard_start)),
1251                                guard_start + used_number);
1252                 if (rc)
1253                         break;
1254
1255                 used_number += used;
1256                 if (used_number == guard_number) {
1257                         cfs_crypto_hash_update_page(req, __page, 0,
1258                                 used_number * sizeof(*guard_start));
1259                         used_number = 0;
1260                 }
1261
1262                 nob -= pga[i]->count;
1263                 pg_count--;
1264                 i++;
1265         }
1266         kunmap(__page);
1267         if (rc)
1268                 GOTO(out, rc);
1269
1270         if (used_number != 0)
1271                 cfs_crypto_hash_update_page(req, __page, 0,
1272                         used_number * sizeof(*guard_start));
1273
1274         bufsize = sizeof(cksum);
1275         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1276
1277         /* For sending we only compute the wrong checksum instead
1278          * of corrupting the data so it is still correct on a redo */
1279         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1280                 cksum++;
1281
1282         *check_sum = cksum;
1283 out:
1284         __free_page(__page);
1285         return rc;
1286 }
1287 #else /* !CONFIG_CRC_T10DIF */
1288 #define obd_dif_ip_fn NULL
1289 #define obd_dif_crc_fn NULL
1290 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1291         -EOPNOTSUPP
1292 #endif /* CONFIG_CRC_T10DIF */
1293
1294 static int osc_checksum_bulk(int nob, size_t pg_count,
1295                              struct brw_page **pga, int opc,
1296                              enum cksum_types cksum_type,
1297                              u32 *cksum)
1298 {
1299         int                             i = 0;
1300         struct ahash_request           *req;
1301         unsigned int                    bufsize;
1302         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1303
1304         LASSERT(pg_count > 0);
1305
1306         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1307         if (IS_ERR(req)) {
1308                 CERROR("Unable to initialize checksum hash %s\n",
1309                        cfs_crypto_hash_name(cfs_alg));
1310                 return PTR_ERR(req);
1311         }
1312
1313         while (nob > 0 && pg_count > 0) {
1314                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1315
1316                 /* corrupt the data before we compute the checksum, to
1317                  * simulate an OST->client data error */
1318                 if (i == 0 && opc == OST_READ &&
1319                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1320                         unsigned char *ptr = kmap(pga[i]->pg);
1321                         int off = pga[i]->off & ~PAGE_MASK;
1322
1323                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1324                         kunmap(pga[i]->pg);
1325                 }
1326                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1327                                             pga[i]->off & ~PAGE_MASK,
1328                                             count);
1329                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1330                                (int)(pga[i]->off & ~PAGE_MASK));
1331
1332                 nob -= pga[i]->count;
1333                 pg_count--;
1334                 i++;
1335         }
1336
1337         bufsize = sizeof(*cksum);
1338         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1339
1340         /* For sending we only compute the wrong checksum instead
1341          * of corrupting the data so it is still correct on a redo */
1342         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1343                 (*cksum)++;
1344
1345         return 0;
1346 }
1347
1348 static int osc_checksum_bulk_rw(const char *obd_name,
1349                                 enum cksum_types cksum_type,
1350                                 int nob, size_t pg_count,
1351                                 struct brw_page **pga, int opc,
1352                                 u32 *check_sum, bool resend)
1353 {
1354         obd_dif_csum_fn *fn = NULL;
1355         int sector_size = 0;
1356         int rc;
1357
1358         ENTRY;
1359         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1360
1361         if (fn)
1362                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1363                                              opc, fn, sector_size, check_sum,
1364                                              resend);
1365         else
1366                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1367                                        check_sum);
1368
1369         RETURN(rc);
1370 }
1371
1372 static inline void osc_release_bounce_pages(struct brw_page **pga,
1373                                             u32 page_count)
1374 {
1375 #ifdef HAVE_LUSTRE_CRYPTO
1376         int i;
1377
1378         for (i = 0; i < page_count; i++) {
1379                 /* Bounce pages allocated by a call to
1380                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1381                  * are identified thanks to the PageChecked flag.
1382                  */
1383                 if (PageChecked(pga[i]->pg))
1384                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1385                 pga[i]->count -= pga[i]->bp_count_diff;
1386                 pga[i]->off += pga[i]->bp_off_diff;
1387         }
1388 #endif
1389 }
1390
1391 static int
1392 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1393                      u32 page_count, struct brw_page **pga,
1394                      struct ptlrpc_request **reqp, int resend)
1395 {
1396         struct ptlrpc_request *req;
1397         struct ptlrpc_bulk_desc *desc;
1398         struct ost_body *body;
1399         struct obd_ioobj *ioobj;
1400         struct niobuf_remote *niobuf;
1401         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1402         struct osc_brw_async_args *aa;
1403         struct req_capsule *pill;
1404         struct brw_page *pg_prev;
1405         void *short_io_buf;
1406         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1407         struct inode *inode = NULL;
1408         bool directio = false;
1409         bool enable_checksum = true;
1410         struct cl_page *clpage;
1411
1412         ENTRY;
1413         if (pga[0]->pg) {
1414                 clpage = oap2cl_page(brw_page2oap(pga[0]));
1415                 inode = clpage->cp_inode;
1416                 if (clpage->cp_type == CPT_TRANSIENT)
1417                         directio = true;
1418         }
1419         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1420                 RETURN(-ENOMEM); /* Recoverable */
1421         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1422                 RETURN(-EINVAL); /* Fatal */
1423
1424         if ((cmd & OBD_BRW_WRITE) != 0) {
1425                 opc = OST_WRITE;
1426                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1427                                                 osc_rq_pool,
1428                                                 &RQF_OST_BRW_WRITE);
1429         } else {
1430                 opc = OST_READ;
1431                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1432         }
1433         if (req == NULL)
1434                 RETURN(-ENOMEM);
1435
1436         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1437             llcrypt_has_encryption_key(inode)) {
1438                 for (i = 0; i < page_count; i++) {
1439                         struct brw_page *brwpg = pga[i];
1440                         struct page *data_page = NULL;
1441                         bool retried = false;
1442                         bool lockedbymyself;
1443                         u32 nunits = (brwpg->off & ~PAGE_MASK) + brwpg->count;
1444                         struct address_space *map_orig = NULL;
1445                         pgoff_t index_orig;
1446
1447 retry_encrypt:
1448                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1449                         /* The page can already be locked when we arrive here.
1450                          * This is possible when cl_page_assume/vvp_page_assume
1451                          * is stuck on wait_on_page_writeback with page lock
1452                          * held. In this case there is no risk for the lock to
1453                          * be released while we are doing our encryption
1454                          * processing, because writeback against that page will
1455                          * end in vvp_page_completion_write/cl_page_completion,
1456                          * which means only once the page is fully processed.
1457                          */
1458                         lockedbymyself = trylock_page(brwpg->pg);
1459                         if (directio) {
1460                                 map_orig = brwpg->pg->mapping;
1461                                 brwpg->pg->mapping = inode->i_mapping;
1462                                 index_orig = brwpg->pg->index;
1463                                 clpage = oap2cl_page(brw_page2oap(brwpg));
1464                                 brwpg->pg->index = clpage->cp_page_index;
1465                         }
1466                         data_page =
1467                                 llcrypt_encrypt_pagecache_blocks(brwpg->pg,
1468                                                                  nunits, 0,
1469                                                                  GFP_NOFS);
1470                         if (directio) {
1471                                 brwpg->pg->mapping = map_orig;
1472                                 brwpg->pg->index = index_orig;
1473                         }
1474                         if (lockedbymyself)
1475                                 unlock_page(brwpg->pg);
1476                         if (IS_ERR(data_page)) {
1477                                 rc = PTR_ERR(data_page);
1478                                 if (rc == -ENOMEM && !retried) {
1479                                         retried = true;
1480                                         rc = 0;
1481                                         goto retry_encrypt;
1482                                 }
1483                                 ptlrpc_request_free(req);
1484                                 RETURN(rc);
1485                         }
1486                         /* Set PageChecked flag on bounce page for
1487                          * disambiguation in osc_release_bounce_pages().
1488                          */
1489                         SetPageChecked(data_page);
1490                         brwpg->pg = data_page;
1491                         /* there should be no gap in the middle of page array */
1492                         if (i == page_count - 1) {
1493                                 struct osc_async_page *oap =
1494                                         brw_page2oap(brwpg);
1495
1496                                 oa->o_size = oap->oap_count +
1497                                         oap->oap_obj_off + oap->oap_page_off;
1498                         }
1499                         /* len is forced to nunits, and relative offset to 0
1500                          * so store the old, clear text info
1501                          */
1502                         brwpg->bp_count_diff = nunits - brwpg->count;
1503                         brwpg->count = nunits;
1504                         brwpg->bp_off_diff = brwpg->off & ~PAGE_MASK;
1505                         brwpg->off = brwpg->off & PAGE_MASK;
1506                 }
1507         } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1508                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1509                 struct cl_page *clpage = oap2cl_page(oap);
1510                 struct cl_object *clobj = clpage->cp_obj;
1511                 struct cl_attr attr = { 0 };
1512                 struct lu_env *env;
1513                 __u16 refcheck;
1514
1515                 env = cl_env_get(&refcheck);
1516                 if (IS_ERR(env)) {
1517                         rc = PTR_ERR(env);
1518                         ptlrpc_request_free(req);
1519                         RETURN(rc);
1520                 }
1521
1522                 cl_object_attr_lock(clobj);
1523                 rc = cl_object_attr_get(env, clobj, &attr);
1524                 cl_object_attr_unlock(clobj);
1525                 cl_env_put(env, &refcheck);
1526                 if (rc != 0) {
1527                         ptlrpc_request_free(req);
1528                         RETURN(rc);
1529                 }
1530                 if (attr.cat_size)
1531                         oa->o_size = attr.cat_size;
1532         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1533                    llcrypt_has_encryption_key(inode)) {
1534                 for (i = 0; i < page_count; i++) {
1535                         struct brw_page *pg = pga[i];
1536                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1537
1538                         nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1539                         /* count/off are forced to cover the whole encryption
1540                          * unit size so that all encrypted data is stored on the
1541                          * OST, so adjust bp_{count,off}_diff for the size of
1542                          * the clear text.
1543                          */
1544                         pg->bp_count_diff = nunits - pg->count;
1545                         pg->count = nunits;
1546                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1547                         pg->off = pg->off & PAGE_MASK;
1548                 }
1549         }
1550
1551         for (niocount = i = 1; i < page_count; i++) {
1552                 if (!can_merge_pages(pga[i - 1], pga[i]))
1553                         niocount++;
1554         }
1555
1556         pill = &req->rq_pill;
1557         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1558                              sizeof(*ioobj));
1559         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1560                              niocount * sizeof(*niobuf));
1561
1562         for (i = 0; i < page_count; i++) {
1563                 short_io_size += pga[i]->count;
1564                 if (!inode || !IS_ENCRYPTED(inode) ||
1565                     !llcrypt_has_encryption_key(inode)) {
1566                         pga[i]->bp_count_diff = 0;
1567                         pga[i]->bp_off_diff = 0;
1568                 }
1569         }
1570
1571         if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1572                 enable_checksum = false;
1573                 short_io_size = 0;
1574         }
1575
1576         /* Check if read/write is small enough to be a short io. */
1577         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1578             !imp_connect_shortio(cli->cl_import))
1579                 short_io_size = 0;
1580
1581         /* If this is an empty RPC to old server, just ignore it */
1582         if (!short_io_size && !pga[0]->pg) {
1583                 ptlrpc_request_free(req);
1584                 RETURN(-ENODATA);
1585         }
1586
1587         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1588                              opc == OST_READ ? 0 : short_io_size);
1589         if (opc == OST_READ)
1590                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1591                                      short_io_size);
1592
1593         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1594         if (rc) {
1595                 ptlrpc_request_free(req);
1596                 RETURN(rc);
1597         }
1598         osc_set_io_portal(req);
1599
1600         ptlrpc_at_set_req_timeout(req);
1601         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1602          * retry logic */
1603         req->rq_no_retry_einprogress = 1;
1604
1605         if (short_io_size != 0) {
1606                 desc = NULL;
1607                 short_io_buf = NULL;
1608                 goto no_bulk;
1609         }
1610
1611         desc = ptlrpc_prep_bulk_imp(req, page_count,
1612                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1613                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1614                         PTLRPC_BULK_PUT_SINK),
1615                 OST_BULK_PORTAL,
1616                 &ptlrpc_bulk_kiov_pin_ops);
1617
1618         if (desc == NULL)
1619                 GOTO(out, rc = -ENOMEM);
1620         /* NB request now owns desc and will free it when it gets freed */
1621 no_bulk:
1622         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1623         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1624         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1625         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1626
1627         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1628
1629         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1630          * and from_kgid(), because they are asynchronous. Fortunately, variable
1631          * oa contains valid o_uid and o_gid in these two operations.
1632          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1633          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1634          * other process logic */
1635         body->oa.o_uid = oa->o_uid;
1636         body->oa.o_gid = oa->o_gid;
1637
1638         obdo_to_ioobj(oa, ioobj);
1639         ioobj->ioo_bufcnt = niocount;
1640         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1641          * that might be send for this request.  The actual number is decided
1642          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1643          * "max - 1" for old client compatibility sending "0", and also so the
1644          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1645         if (desc != NULL)
1646                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1647         else /* short io */
1648                 ioobj_max_brw_set(ioobj, 0);
1649
1650         if (short_io_size != 0) {
1651                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1652                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1653                         body->oa.o_flags = 0;
1654                 }
1655                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1656                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1657                        short_io_size);
1658                 if (opc == OST_WRITE) {
1659                         short_io_buf = req_capsule_client_get(pill,
1660                                                               &RMF_SHORT_IO);
1661                         LASSERT(short_io_buf != NULL);
1662                 }
1663         }
1664
1665         LASSERT(page_count > 0);
1666         pg_prev = pga[0];
1667         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1668                 struct brw_page *pg = pga[i];
1669                 int poff = pg->off & ~PAGE_MASK;
1670
1671                 LASSERT(pg->count > 0);
1672                 /* make sure there is no gap in the middle of page array */
1673                 LASSERTF(page_count == 1 ||
1674                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1675                           ergo(i > 0 && i < page_count - 1,
1676                                poff == 0 && pg->count == PAGE_SIZE)   &&
1677                           ergo(i == page_count - 1, poff == 0)),
1678                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1679                          i, page_count, pg, pg->off, pg->count);
1680                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1681                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1682                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1683                          i, page_count,
1684                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1685                          pg_prev->pg, page_private(pg_prev->pg),
1686                          pg_prev->pg->index, pg_prev->off);
1687                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1688                         (pg->flag & OBD_BRW_SRVLOCK));
1689                 if (short_io_size != 0 && opc == OST_WRITE) {
1690                         unsigned char *ptr = kmap_atomic(pg->pg);
1691
1692                         LASSERT(short_io_size >= requested_nob + pg->count);
1693                         memcpy(short_io_buf + requested_nob,
1694                                ptr + poff,
1695                                pg->count);
1696                         kunmap_atomic(ptr);
1697                 } else if (short_io_size == 0) {
1698                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1699                                                          pg->count);
1700                 }
1701                 requested_nob += pg->count;
1702
1703                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1704                         niobuf--;
1705                         niobuf->rnb_len += pg->count;
1706                 } else {
1707                         niobuf->rnb_offset = pg->off;
1708                         niobuf->rnb_len    = pg->count;
1709                         niobuf->rnb_flags  = pg->flag;
1710                 }
1711                 pg_prev = pg;
1712         }
1713
1714         LASSERTF((void *)(niobuf - niocount) ==
1715                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1716                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1717                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1718
1719         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1720         if (resend) {
1721                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1722                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1723                         body->oa.o_flags = 0;
1724                 }
1725                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1726         }
1727
1728         if (osc_should_shrink_grant(cli))
1729                 osc_shrink_grant_local(cli, &body->oa);
1730
1731         if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1732                 enable_checksum = false;
1733
1734         /* size[REQ_REC_OFF] still sizeof (*body) */
1735         if (opc == OST_WRITE) {
1736                 if (enable_checksum) {
1737                         /* store cl_cksum_type in a local variable since
1738                          * it can be changed via lprocfs */
1739                         enum cksum_types cksum_type = cli->cl_cksum_type;
1740
1741                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1742                                 body->oa.o_flags = 0;
1743
1744                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1745                                                                 cksum_type);
1746                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1747
1748                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1749                                                   requested_nob, page_count,
1750                                                   pga, OST_WRITE,
1751                                                   &body->oa.o_cksum, resend);
1752                         if (rc < 0) {
1753                                 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1754                                        rc);
1755                                 GOTO(out, rc);
1756                         }
1757                         CDEBUG(D_PAGE | (resend ? D_HA : 0),
1758                                "checksum at write origin: %x (%x)\n",
1759                                body->oa.o_cksum, cksum_type);
1760
1761                         /* save this in 'oa', too, for later checking */
1762                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1763                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1764                                                            cksum_type);
1765                 } else {
1766                         /* clear out the checksum flag, in case this is a
1767                          * resend but cl_checksum is no longer set. b=11238 */
1768                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1769                 }
1770                 oa->o_cksum = body->oa.o_cksum;
1771                 /* 1 RC per niobuf */
1772                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1773                                      sizeof(__u32) * niocount);
1774         } else {
1775                 if (enable_checksum) {
1776                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1777                                 body->oa.o_flags = 0;
1778                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1779                                 cli->cl_cksum_type);
1780                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1781                 }
1782
1783                 /* Client cksum has been already copied to wire obdo in previous
1784                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1785                  * resent due to cksum error, this will allow Server to
1786                  * check+dump pages on its side */
1787         }
1788         ptlrpc_request_set_replen(req);
1789
1790         aa = ptlrpc_req_async_args(aa, req);
1791         aa->aa_oa = oa;
1792         aa->aa_requested_nob = requested_nob;
1793         aa->aa_nio_count = niocount;
1794         aa->aa_page_count = page_count;
1795         aa->aa_resends = 0;
1796         aa->aa_ppga = pga;
1797         aa->aa_cli = cli;
1798         INIT_LIST_HEAD(&aa->aa_oaps);
1799
1800         *reqp = req;
1801         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1802         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1803                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1804                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1805         RETURN(0);
1806
1807  out:
1808         ptlrpc_req_finished(req);
1809         RETURN(rc);
1810 }
1811
1812 char dbgcksum_file_name[PATH_MAX];
1813
1814 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1815                                 struct brw_page **pga, __u32 server_cksum,
1816                                 __u32 client_cksum)
1817 {
1818         struct file *filp;
1819         int rc, i;
1820         unsigned int len;
1821         char *buf;
1822
1823         /* will only keep dump of pages on first error for the same range in
1824          * file/fid, not during the resends/retries. */
1825         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1826                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1827                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1828                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1829                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1830                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1831                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1832                  pga[0]->off,
1833                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1834                  client_cksum, server_cksum);
1835         CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1836         filp = filp_open(dbgcksum_file_name,
1837                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1838         if (IS_ERR(filp)) {
1839                 rc = PTR_ERR(filp);
1840                 if (rc == -EEXIST)
1841                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1842                                "checksum error: rc = %d\n", dbgcksum_file_name,
1843                                rc);
1844                 else
1845                         CERROR("%s: can't open to dump pages with checksum "
1846                                "error: rc = %d\n", dbgcksum_file_name, rc);
1847                 return;
1848         }
1849
1850         for (i = 0; i < page_count; i++) {
1851                 len = pga[i]->count;
1852                 buf = kmap(pga[i]->pg);
1853                 while (len != 0) {
1854                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1855                         if (rc < 0) {
1856                                 CERROR("%s: wanted to write %u but got %d "
1857                                        "error\n", dbgcksum_file_name, len, rc);
1858                                 break;
1859                         }
1860                         len -= rc;
1861                         buf += rc;
1862                 }
1863                 kunmap(pga[i]->pg);
1864         }
1865
1866         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1867         if (rc)
1868                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1869         filp_close(filp, NULL);
1870
1871         libcfs_debug_dumplog();
1872 }
1873
1874 static int
1875 check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
1876                      __u32 client_cksum, __u32 server_cksum,
1877                      struct osc_brw_async_args *aa)
1878 {
1879         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1880         enum cksum_types cksum_type;
1881         obd_dif_csum_fn *fn = NULL;
1882         int sector_size = 0;
1883         __u32 new_cksum;
1884         char *msg;
1885         int rc;
1886
1887         if (server_cksum == client_cksum) {
1888                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1889                 return 0;
1890         }
1891
1892         if (aa->aa_cli->cl_checksum_dump)
1893                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1894                                     server_cksum, client_cksum);
1895
1896         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1897                                            oa->o_flags : 0);
1898
1899         switch (cksum_type) {
1900         case OBD_CKSUM_T10IP512:
1901                 fn = obd_dif_ip_fn;
1902                 sector_size = 512;
1903                 break;
1904         case OBD_CKSUM_T10IP4K:
1905                 fn = obd_dif_ip_fn;
1906                 sector_size = 4096;
1907                 break;
1908         case OBD_CKSUM_T10CRC512:
1909                 fn = obd_dif_crc_fn;
1910                 sector_size = 512;
1911                 break;
1912         case OBD_CKSUM_T10CRC4K:
1913                 fn = obd_dif_crc_fn;
1914                 sector_size = 4096;
1915                 break;
1916         default:
1917                 break;
1918         }
1919
1920         if (fn)
1921                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1922                                              aa->aa_page_count, aa->aa_ppga,
1923                                              OST_WRITE, fn, sector_size,
1924                                              &new_cksum, true);
1925         else
1926                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1927                                        aa->aa_ppga, OST_WRITE, cksum_type,
1928                                        &new_cksum);
1929
1930         if (rc < 0)
1931                 msg = "failed to calculate the client write checksum";
1932         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1933                 msg = "the server did not use the checksum type specified in "
1934                       "the original request - likely a protocol problem";
1935         else if (new_cksum == server_cksum)
1936                 msg = "changed on the client after we checksummed it - "
1937                       "likely false positive due to mmap IO (bug 11742)";
1938         else if (new_cksum == client_cksum)
1939                 msg = "changed in transit before arrival at OST";
1940         else
1941                 msg = "changed in transit AND doesn't match the original - "
1942                       "likely false positive due to mmap IO (bug 11742)";
1943
1944         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1945                            DFID " object "DOSTID" extent [%llu-%llu], original "
1946                            "client csum %x (type %x), server csum %x (type %x),"
1947                            " client csum now %x\n",
1948                            obd_name, msg, libcfs_nidstr(&peer->nid),
1949                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1950                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1951                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1952                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1953                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1954                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1955                            client_cksum,
1956                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1957                            server_cksum, cksum_type, new_cksum);
1958         return 1;
1959 }
1960
1961 /* Note rc enters this function as number of bytes transferred */
1962 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1963 {
1964         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1965         struct client_obd *cli = aa->aa_cli;
1966         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1967         const struct lnet_processid *peer =
1968                 &req->rq_import->imp_connection->c_peer;
1969         struct ost_body *body;
1970         u32 client_cksum = 0;
1971         struct inode *inode = NULL;
1972         unsigned int blockbits = 0, blocksize = 0;
1973         struct cl_page *clpage;
1974
1975         ENTRY;
1976
1977         if (rc < 0 && rc != -EDQUOT) {
1978                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1979                 RETURN(rc);
1980         }
1981
1982         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1983         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1984         if (body == NULL) {
1985                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1986                 RETURN(-EPROTO);
1987         }
1988
1989         /* set/clear over quota flag for a uid/gid/projid */
1990         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1991             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1992                 unsigned qid[LL_MAXQUOTAS] = {
1993                                          body->oa.o_uid, body->oa.o_gid,
1994                                          body->oa.o_projid };
1995                 CDEBUG(D_QUOTA,
1996                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1997                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1998                        body->oa.o_valid, body->oa.o_flags);
1999                 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2000                                 body->oa.o_flags);
2001         }
2002
2003         osc_update_grant(cli, body);
2004
2005         if (rc < 0)
2006                 RETURN(rc);
2007
2008         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2009                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2010
2011         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2012                 if (rc > 0) {
2013                         CERROR("%s: unexpected positive size %d\n",
2014                                obd_name, rc);
2015                         RETURN(-EPROTO);
2016                 }
2017
2018                 if (req->rq_bulk != NULL &&
2019                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2020                         RETURN(-EAGAIN);
2021
2022                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2023                     check_write_checksum(&body->oa, peer, client_cksum,
2024                                          body->oa.o_cksum, aa))
2025                         RETURN(-EAGAIN);
2026
2027                 rc = check_write_rcs(req, aa->aa_requested_nob,
2028                                      aa->aa_nio_count, aa->aa_page_count,
2029                                      aa->aa_ppga);
2030                 GOTO(out, rc);
2031         }
2032
2033         /* The rest of this function executes only for OST_READs */
2034
2035         if (req->rq_bulk == NULL) {
2036                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2037                                           RCL_SERVER);
2038                 LASSERT(rc == req->rq_status);
2039         } else {
2040                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2041                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2042         }
2043         if (rc < 0)
2044                 GOTO(out, rc = -EAGAIN);
2045
2046         if (rc > aa->aa_requested_nob) {
2047                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2048                        rc, aa->aa_requested_nob);
2049                 RETURN(-EPROTO);
2050         }
2051
2052         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2053                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2054                        rc, req->rq_bulk->bd_nob_transferred);
2055                 RETURN(-EPROTO);
2056         }
2057
2058         if (req->rq_bulk == NULL) {
2059                 /* short io */
2060                 int nob, pg_count, i = 0;
2061                 unsigned char *buf;
2062
2063                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2064                 pg_count = aa->aa_page_count;
2065                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2066                                                    rc);
2067                 nob = rc;
2068                 while (nob > 0 && pg_count > 0) {
2069                         unsigned char *ptr;
2070                         int count = aa->aa_ppga[i]->count > nob ?
2071                                     nob : aa->aa_ppga[i]->count;
2072
2073                         CDEBUG(D_CACHE, "page %p count %d\n",
2074                                aa->aa_ppga[i]->pg, count);
2075                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2076                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2077                                count);
2078                         kunmap_atomic((void *) ptr);
2079
2080                         buf += count;
2081                         nob -= count;
2082                         i++;
2083                         pg_count--;
2084                 }
2085         }
2086
2087         if (rc < aa->aa_requested_nob)
2088                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2089
2090         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2091                 static int cksum_counter;
2092                 u32 server_cksum = body->oa.o_cksum;
2093                 int nob = rc;
2094                 char *via = "";
2095                 char *router = "";
2096                 enum cksum_types cksum_type;
2097                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2098                         body->oa.o_flags : 0;
2099
2100                 cksum_type = obd_cksum_type_unpack(o_flags);
2101                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2102                                           aa->aa_page_count, aa->aa_ppga,
2103                                           OST_READ, &client_cksum, false);
2104                 if (rc < 0)
2105                         GOTO(out, rc);
2106
2107                 if (req->rq_bulk != NULL &&
2108                     lnet_nid_to_nid4(&peer->nid) != req->rq_bulk->bd_sender) {
2109                         via = " via ";
2110                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2111                 }
2112
2113                 if (server_cksum != client_cksum) {
2114                         struct ost_body *clbody;
2115                         __u32 client_cksum2;
2116                         u32 page_count = aa->aa_page_count;
2117
2118                         osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2119                                              page_count, aa->aa_ppga,
2120                                              OST_READ, &client_cksum2, true);
2121                         clbody = req_capsule_client_get(&req->rq_pill,
2122                                                         &RMF_OST_BODY);
2123                         if (cli->cl_checksum_dump)
2124                                 dump_all_bulk_pages(&clbody->oa, page_count,
2125                                                     aa->aa_ppga, server_cksum,
2126                                                     client_cksum);
2127
2128                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2129                                            "%s%s%s inode "DFID" object "DOSTID
2130                                            " extent [%llu-%llu], client %x/%x, "
2131                                            "server %x, cksum_type %x\n",
2132                                            obd_name,
2133                                            libcfs_nidstr(&peer->nid),
2134                                            via, router,
2135                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2136                                                 clbody->oa.o_parent_seq : 0ULL,
2137                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2138                                                 clbody->oa.o_parent_oid : 0,
2139                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2140                                                 clbody->oa.o_parent_ver : 0,
2141                                            POSTID(&body->oa.o_oi),
2142                                            aa->aa_ppga[0]->off,
2143                                            aa->aa_ppga[page_count-1]->off +
2144                                            aa->aa_ppga[page_count-1]->count - 1,
2145                                            client_cksum, client_cksum2,
2146                                            server_cksum, cksum_type);
2147                         cksum_counter = 0;
2148                         aa->aa_oa->o_cksum = client_cksum;
2149                         rc = -EAGAIN;
2150                 } else {
2151                         cksum_counter++;
2152                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2153                         rc = 0;
2154                 }
2155         } else if (unlikely(client_cksum)) {
2156                 static int cksum_missed;
2157
2158                 cksum_missed++;
2159                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2160                         CERROR("%s: checksum %u requested from %s but not sent\n",
2161                                obd_name, cksum_missed,
2162                                libcfs_nidstr(&peer->nid));
2163         } else {
2164                 rc = 0;
2165         }
2166
2167         /* get the inode from the first cl_page */
2168         clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
2169         inode = clpage->cp_inode;
2170         if (clpage->cp_type == CPT_TRANSIENT && inode) {
2171                 blockbits = inode->i_blkbits;
2172                 blocksize = 1 << blockbits;
2173         }
2174         if (inode && IS_ENCRYPTED(inode)) {
2175                 int idx;
2176
2177                 if (!llcrypt_has_encryption_key(inode)) {
2178                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2179                         GOTO(out, rc);
2180                 }
2181                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2182                         struct brw_page *brwpg = aa->aa_ppga[idx];
2183                         unsigned int offs = 0;
2184
2185                         while (offs < PAGE_SIZE) {
2186                                 /* do not decrypt if page is all 0s */
2187                                 if (memchr_inv(page_address(brwpg->pg) + offs,
2188                                       0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2189                                         /* if page is empty forward info to
2190                                          * upper layers (ll_io_zero_page) by
2191                                          * clearing PagePrivate2
2192                                          */
2193                                         if (!offs)
2194                                                 ClearPagePrivate2(brwpg->pg);
2195                                         break;
2196                                 }
2197
2198                                 if (blockbits) {
2199                                         /* This is direct IO case. Directly call
2200                                          * decrypt function that takes inode as
2201                                          * input parameter. Page does not need
2202                                          * to be locked.
2203                                          */
2204                                         u64 lblk_num;
2205                                         unsigned int i;
2206
2207                                         clpage =
2208                                                oap2cl_page(brw_page2oap(brwpg));
2209                                         lblk_num =
2210                                                 ((u64)(clpage->cp_page_index) <<
2211                                                 (PAGE_SHIFT - blockbits)) +
2212                                                 (offs >> blockbits);
2213                                         for (i = offs;
2214                                              i < offs +
2215                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2216                                              i += blocksize, lblk_num++) {
2217                                                 rc =
2218                                                   llcrypt_decrypt_block_inplace(
2219                                                           inode, brwpg->pg,
2220                                                           blocksize, i,
2221                                                           lblk_num);
2222                                                 if (rc)
2223                                                         break;
2224                                         }
2225                                 } else {
2226                                         rc = llcrypt_decrypt_pagecache_blocks(
2227                                                 brwpg->pg,
2228                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2229                                                 offs);
2230                                 }
2231                                 if (rc)
2232                                         GOTO(out, rc);
2233
2234                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2235                         }
2236                 }
2237         }
2238
2239 out:
2240         if (rc >= 0)
2241                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2242                                      aa->aa_oa, &body->oa);
2243
2244         RETURN(rc);
2245 }
2246
2247 static int osc_brw_redo_request(struct ptlrpc_request *request,
2248                                 struct osc_brw_async_args *aa, int rc)
2249 {
2250         struct ptlrpc_request *new_req;
2251         struct osc_brw_async_args *new_aa;
2252         struct osc_async_page *oap;
2253         ENTRY;
2254
2255         /* The below message is checked in replay-ost-single.sh test_8ae*/
2256         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2257                   "redo for recoverable error %d", rc);
2258
2259         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2260                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2261                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2262                                   aa->aa_ppga, &new_req, 1);
2263         if (rc)
2264                 RETURN(rc);
2265
2266         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2267                 if (oap->oap_request != NULL) {
2268                         LASSERTF(request == oap->oap_request,
2269                                  "request %p != oap_request %p\n",
2270                                  request, oap->oap_request);
2271                 }
2272         }
2273         /*
2274          * New request takes over pga and oaps from old request.
2275          * Note that copying a list_head doesn't work, need to move it...
2276          */
2277         aa->aa_resends++;
2278         new_req->rq_interpret_reply = request->rq_interpret_reply;
2279         new_req->rq_async_args = request->rq_async_args;
2280         new_req->rq_commit_cb = request->rq_commit_cb;
2281         /* cap resend delay to the current request timeout, this is similar to
2282          * what ptlrpc does (see after_reply()) */
2283         if (aa->aa_resends > new_req->rq_timeout)
2284                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2285         else
2286                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2287         new_req->rq_generation_set = 1;
2288         new_req->rq_import_generation = request->rq_import_generation;
2289
2290         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2291
2292         INIT_LIST_HEAD(&new_aa->aa_oaps);
2293         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2294         INIT_LIST_HEAD(&new_aa->aa_exts);
2295         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2296         new_aa->aa_resends = aa->aa_resends;
2297
2298         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2299                 if (oap->oap_request) {
2300                         ptlrpc_req_finished(oap->oap_request);
2301                         oap->oap_request = ptlrpc_request_addref(new_req);
2302                 }
2303         }
2304
2305         /* XXX: This code will run into problem if we're going to support
2306          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2307          * and wait for all of them to be finished. We should inherit request
2308          * set from old request. */
2309         ptlrpcd_add_req(new_req);
2310
2311         DEBUG_REQ(D_INFO, new_req, "new request");
2312         RETURN(0);
2313 }
2314
2315 /*
2316  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2317  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2318  * fine for our small page arrays and doesn't require allocation.  its an
2319  * insertion sort that swaps elements that are strides apart, shrinking the
2320  * stride down until its '1' and the array is sorted.
2321  */
2322 static void sort_brw_pages(struct brw_page **array, int num)
2323 {
2324         int stride, i, j;
2325         struct brw_page *tmp;
2326
2327         if (num == 1)
2328                 return;
2329         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2330                 ;
2331
2332         do {
2333                 stride /= 3;
2334                 for (i = stride ; i < num ; i++) {
2335                         tmp = array[i];
2336                         j = i;
2337                         while (j >= stride && array[j - stride]->off > tmp->off) {
2338                                 array[j] = array[j - stride];
2339                                 j -= stride;
2340                         }
2341                         array[j] = tmp;
2342                 }
2343         } while (stride > 1);
2344 }
2345
2346 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2347 {
2348         LASSERT(ppga != NULL);
2349         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2350 }
2351
2352 static int brw_interpret(const struct lu_env *env,
2353                          struct ptlrpc_request *req, void *args, int rc)
2354 {
2355         struct osc_brw_async_args *aa = args;
2356         struct osc_extent *ext;
2357         struct osc_extent *tmp;
2358         struct client_obd *cli = aa->aa_cli;
2359         unsigned long transferred = 0;
2360
2361         ENTRY;
2362
2363         rc = osc_brw_fini_request(req, rc);
2364         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2365
2366         /* restore clear text pages */
2367         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2368
2369         /*
2370          * When server returns -EINPROGRESS, client should always retry
2371          * regardless of the number of times the bulk was resent already.
2372          */
2373         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2374                 if (req->rq_import_generation !=
2375                     req->rq_import->imp_generation) {
2376                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2377                                ""DOSTID", rc = %d.\n",
2378                                req->rq_import->imp_obd->obd_name,
2379                                POSTID(&aa->aa_oa->o_oi), rc);
2380                 } else if (rc == -EINPROGRESS ||
2381                            client_should_resend(aa->aa_resends, aa->aa_cli)) {
2382                         rc = osc_brw_redo_request(req, aa, rc);
2383                 } else {
2384                         CERROR("%s: too many resent retries for object: "
2385                                "%llu:%llu, rc = %d.\n",
2386                                req->rq_import->imp_obd->obd_name,
2387                                POSTID(&aa->aa_oa->o_oi), rc);
2388                 }
2389
2390                 if (rc == 0)
2391                         RETURN(0);
2392                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2393                         rc = -EIO;
2394         }
2395
2396         if (rc == 0) {
2397                 struct obdo *oa = aa->aa_oa;
2398                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2399                 unsigned long valid = 0;
2400                 struct cl_object *obj;
2401                 struct osc_async_page *last;
2402
2403                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2404                 obj = osc2cl(last->oap_obj);
2405
2406                 cl_object_attr_lock(obj);
2407                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2408                         attr->cat_blocks = oa->o_blocks;
2409                         valid |= CAT_BLOCKS;
2410                 }
2411                 if (oa->o_valid & OBD_MD_FLMTIME) {
2412                         attr->cat_mtime = oa->o_mtime;
2413                         valid |= CAT_MTIME;
2414                 }
2415                 if (oa->o_valid & OBD_MD_FLATIME) {
2416                         attr->cat_atime = oa->o_atime;
2417                         valid |= CAT_ATIME;
2418                 }
2419                 if (oa->o_valid & OBD_MD_FLCTIME) {
2420                         attr->cat_ctime = oa->o_ctime;
2421                         valid |= CAT_CTIME;
2422                 }
2423
2424                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2425                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2426                         loff_t last_off = last->oap_count + last->oap_obj_off +
2427                                 last->oap_page_off;
2428
2429                         /* Change file size if this is an out of quota or
2430                          * direct IO write and it extends the file size */
2431                         if (loi->loi_lvb.lvb_size < last_off) {
2432                                 attr->cat_size = last_off;
2433                                 valid |= CAT_SIZE;
2434                         }
2435                         /* Extend KMS if it's not a lockless write */
2436                         if (loi->loi_kms < last_off &&
2437                             oap2osc_page(last)->ops_srvlock == 0) {
2438                                 attr->cat_kms = last_off;
2439                                 valid |= CAT_KMS;
2440                         }
2441                 }
2442
2443                 if (valid != 0)
2444                         cl_object_attr_update(env, obj, attr, valid);
2445                 cl_object_attr_unlock(obj);
2446         }
2447         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2448         aa->aa_oa = NULL;
2449
2450         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2451                 osc_inc_unstable_pages(req);
2452
2453         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2454                 list_del_init(&ext->oe_link);
2455                 osc_extent_finish(env, ext, 1,
2456                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2457         }
2458         LASSERT(list_empty(&aa->aa_exts));
2459         LASSERT(list_empty(&aa->aa_oaps));
2460
2461         transferred = (req->rq_bulk == NULL ? /* short io */
2462                        aa->aa_requested_nob :
2463                        req->rq_bulk->bd_nob_transferred);
2464
2465         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2466         ptlrpc_lprocfs_brw(req, transferred);
2467
2468         spin_lock(&cli->cl_loi_list_lock);
2469         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2470          * is called so we know whether to go to sync BRWs or wait for more
2471          * RPCs to complete */
2472         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2473                 cli->cl_w_in_flight--;
2474         else
2475                 cli->cl_r_in_flight--;
2476         osc_wake_cache_waiters(cli);
2477         spin_unlock(&cli->cl_loi_list_lock);
2478
2479         osc_io_unplug(env, cli, NULL);
2480         RETURN(rc);
2481 }
2482
2483 static void brw_commit(struct ptlrpc_request *req)
2484 {
2485         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2486          * this called via the rq_commit_cb, I need to ensure
2487          * osc_dec_unstable_pages is still called. Otherwise unstable
2488          * pages may be leaked. */
2489         spin_lock(&req->rq_lock);
2490         if (likely(req->rq_unstable)) {
2491                 req->rq_unstable = 0;
2492                 spin_unlock(&req->rq_lock);
2493
2494                 osc_dec_unstable_pages(req);
2495         } else {
2496                 req->rq_committed = 1;
2497                 spin_unlock(&req->rq_lock);
2498         }
2499 }
2500
2501 /**
2502  * Build an RPC by the list of extent @ext_list. The caller must ensure
2503  * that the total pages in this list are NOT over max pages per RPC.
2504  * Extents in the list must be in OES_RPC state.
2505  */
2506 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2507                   struct list_head *ext_list, int cmd)
2508 {
2509         struct ptlrpc_request           *req = NULL;
2510         struct osc_extent               *ext;
2511         struct brw_page                 **pga = NULL;
2512         struct osc_brw_async_args       *aa = NULL;
2513         struct obdo                     *oa = NULL;
2514         struct osc_async_page           *oap;
2515         struct osc_object               *obj = NULL;
2516         struct cl_req_attr              *crattr = NULL;
2517         loff_t                          starting_offset = OBD_OBJECT_EOF;
2518         loff_t                          ending_offset = 0;
2519         /* '1' for consistency with code that checks !mpflag to restore */
2520         int mpflag = 1;
2521         int                             mem_tight = 0;
2522         int                             page_count = 0;
2523         bool                            soft_sync = false;
2524         bool                            ndelay = false;
2525         int                             i;
2526         int                             grant = 0;
2527         int                             rc;
2528         __u32                           layout_version = 0;
2529         LIST_HEAD(rpc_list);
2530         struct ost_body                 *body;
2531         ENTRY;
2532         LASSERT(!list_empty(ext_list));
2533
2534         /* add pages into rpc_list to build BRW rpc */
2535         list_for_each_entry(ext, ext_list, oe_link) {
2536                 LASSERT(ext->oe_state == OES_RPC);
2537                 mem_tight |= ext->oe_memalloc;
2538                 grant += ext->oe_grants;
2539                 page_count += ext->oe_nr_pages;
2540                 layout_version = max(layout_version, ext->oe_layout_version);
2541                 if (obj == NULL)
2542                         obj = ext->oe_obj;
2543         }
2544
2545         soft_sync = osc_over_unstable_soft_limit(cli);
2546         if (mem_tight)
2547                 mpflag = memalloc_noreclaim_save();
2548
2549         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2550         if (pga == NULL)
2551                 GOTO(out, rc = -ENOMEM);
2552
2553         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2554         if (oa == NULL)
2555                 GOTO(out, rc = -ENOMEM);
2556
2557         i = 0;
2558         list_for_each_entry(ext, ext_list, oe_link) {
2559                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2560                         if (mem_tight)
2561                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2562                         if (soft_sync)
2563                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2564                         pga[i] = &oap->oap_brw_page;
2565                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2566                         i++;
2567
2568                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2569                         if (starting_offset == OBD_OBJECT_EOF ||
2570                             starting_offset > oap->oap_obj_off)
2571                                 starting_offset = oap->oap_obj_off;
2572                         else
2573                                 LASSERT(oap->oap_page_off == 0);
2574                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2575                                 ending_offset = oap->oap_obj_off +
2576                                                 oap->oap_count;
2577                         else
2578                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2579                                         PAGE_SIZE);
2580                 }
2581                 if (ext->oe_ndelay)
2582                         ndelay = true;
2583         }
2584
2585         /* first page in the list */
2586         oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2587
2588         crattr = &osc_env_info(env)->oti_req_attr;
2589         memset(crattr, 0, sizeof(*crattr));
2590         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2591         crattr->cra_flags = ~0ULL;
2592         crattr->cra_page = oap2cl_page(oap);
2593         crattr->cra_oa = oa;
2594         cl_req_attr_set(env, osc2cl(obj), crattr);
2595
2596         if (cmd == OBD_BRW_WRITE) {
2597                 oa->o_grant_used = grant;
2598                 if (layout_version > 0) {
2599                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2600                                PFID(&oa->o_oi.oi_fid), layout_version);
2601
2602                         oa->o_layout_version = layout_version;
2603                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2604                 }
2605         }
2606
2607         sort_brw_pages(pga, page_count);
2608         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2609         if (rc != 0) {
2610                 CERROR("prep_req failed: %d\n", rc);
2611                 GOTO(out, rc);
2612         }
2613
2614         req->rq_commit_cb = brw_commit;
2615         req->rq_interpret_reply = brw_interpret;
2616         req->rq_memalloc = mem_tight != 0;
2617         oap->oap_request = ptlrpc_request_addref(req);
2618         if (ndelay) {
2619                 req->rq_no_resend = req->rq_no_delay = 1;
2620                 /* probably set a shorter timeout value.
2621                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2622                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2623         }
2624
2625         /* Need to update the timestamps after the request is built in case
2626          * we race with setattr (locally or in queue at OST).  If OST gets
2627          * later setattr before earlier BRW (as determined by the request xid),
2628          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2629          * way to do this in a single call.  bug 10150 */
2630         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2631         crattr->cra_oa = &body->oa;
2632         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2633         cl_req_attr_set(env, osc2cl(obj), crattr);
2634         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2635
2636         aa = ptlrpc_req_async_args(aa, req);
2637         INIT_LIST_HEAD(&aa->aa_oaps);
2638         list_splice_init(&rpc_list, &aa->aa_oaps);
2639         INIT_LIST_HEAD(&aa->aa_exts);
2640         list_splice_init(ext_list, &aa->aa_exts);
2641
2642         spin_lock(&cli->cl_loi_list_lock);
2643         starting_offset >>= PAGE_SHIFT;
2644         if (cmd == OBD_BRW_READ) {
2645                 cli->cl_r_in_flight++;
2646                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2647                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2648                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2649                                       starting_offset + 1);
2650         } else {
2651                 cli->cl_w_in_flight++;
2652                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2653                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2654                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2655                                       starting_offset + 1);
2656         }
2657         spin_unlock(&cli->cl_loi_list_lock);
2658
2659         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2660                   page_count, aa, cli->cl_r_in_flight,
2661                   cli->cl_w_in_flight);
2662         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2663
2664         ptlrpcd_add_req(req);
2665         rc = 0;
2666         EXIT;
2667
2668 out:
2669         if (mem_tight)
2670                 memalloc_noreclaim_restore(mpflag);
2671
2672         if (rc != 0) {
2673                 LASSERT(req == NULL);
2674
2675                 if (oa)
2676                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2677                 if (pga) {
2678                         osc_release_bounce_pages(pga, page_count);
2679                         osc_release_ppga(pga, page_count);
2680                 }
2681                 /* this should happen rarely and is pretty bad, it makes the
2682                  * pending list not follow the dirty order
2683                  */
2684                 while ((ext = list_first_entry_or_null(ext_list,
2685                                                        struct osc_extent,
2686                                                        oe_link)) != NULL) {
2687                         list_del_init(&ext->oe_link);
2688                         osc_extent_finish(env, ext, 0, rc);
2689                 }
2690         }
2691         RETURN(rc);
2692 }
2693
2694 /* This is to refresh our lock in face of no RPCs. */
2695 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2696 {
2697         struct ptlrpc_request *req;
2698         struct obdo oa;
2699         struct brw_page bpg = { .off = start, .count = 1};
2700         struct brw_page *pga = &bpg;
2701         int rc;
2702
2703         memset(&oa, 0, sizeof(oa));
2704         oa.o_oi = osc->oo_oinfo->loi_oi;
2705         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2706         /* For updated servers - don't do a read */
2707         oa.o_flags = OBD_FL_NORPC;
2708
2709         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2710                                   &req, 0);
2711
2712         /* If we succeeded we ship it off, if not there's no point in doing
2713          * anything. Also no resends.
2714          * No interpret callback, no commit callback.
2715          */
2716         if (!rc) {
2717                 req->rq_no_resend = 1;
2718                 ptlrpcd_add_req(req);
2719         }
2720 }
2721
2722 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2723 {
2724         int set = 0;
2725
2726         LASSERT(lock != NULL);
2727
2728         lock_res_and_lock(lock);
2729
2730         if (lock->l_ast_data == NULL)
2731                 lock->l_ast_data = data;
2732         if (lock->l_ast_data == data)
2733                 set = 1;
2734
2735         unlock_res_and_lock(lock);
2736
2737         return set;
2738 }
2739
2740 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2741                      void *cookie, struct lustre_handle *lockh,
2742                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2743                      int errcode)
2744 {
2745         bool intent = *flags & LDLM_FL_HAS_INTENT;
2746         int rc;
2747         ENTRY;
2748
2749         /* The request was created before ldlm_cli_enqueue call. */
2750         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2751                 struct ldlm_reply *rep;
2752
2753                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2754                 LASSERT(rep != NULL);
2755
2756                 rep->lock_policy_res1 =
2757                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2758                 if (rep->lock_policy_res1)
2759                         errcode = rep->lock_policy_res1;
2760                 if (!speculative)
2761                         *flags |= LDLM_FL_LVB_READY;
2762         } else if (errcode == ELDLM_OK) {
2763                 *flags |= LDLM_FL_LVB_READY;
2764         }
2765
2766         /* Call the update callback. */
2767         rc = (*upcall)(cookie, lockh, errcode);
2768
2769         /* release the reference taken in ldlm_cli_enqueue() */
2770         if (errcode == ELDLM_LOCK_MATCHED)
2771                 errcode = ELDLM_OK;
2772         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2773                 ldlm_lock_decref(lockh, mode);
2774
2775         RETURN(rc);
2776 }
2777
2778 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2779                           void *args, int rc)
2780 {
2781         struct osc_enqueue_args *aa = args;
2782         struct ldlm_lock *lock;
2783         struct lustre_handle *lockh = &aa->oa_lockh;
2784         enum ldlm_mode mode = aa->oa_mode;
2785         struct ost_lvb *lvb = aa->oa_lvb;
2786         __u32 lvb_len = sizeof(*lvb);
2787         __u64 flags = 0;
2788         struct ldlm_enqueue_info einfo = {
2789                 .ei_type = aa->oa_type,
2790                 .ei_mode = mode,
2791         };
2792
2793         ENTRY;
2794
2795         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2796          * be valid. */
2797         lock = ldlm_handle2lock(lockh);
2798         LASSERTF(lock != NULL,
2799                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2800                  lockh->cookie, req, aa);
2801
2802         /* Take an additional reference so that a blocking AST that
2803          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2804          * to arrive after an upcall has been executed by
2805          * osc_enqueue_fini(). */
2806         ldlm_lock_addref(lockh, mode);
2807
2808         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2809         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2810
2811         /* Let CP AST to grant the lock first. */
2812         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2813
2814         if (aa->oa_speculative) {
2815                 LASSERT(aa->oa_lvb == NULL);
2816                 LASSERT(aa->oa_flags == NULL);
2817                 aa->oa_flags = &flags;
2818         }
2819
2820         /* Complete obtaining the lock procedure. */
2821         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2822                                    lvb, lvb_len, lockh, rc, false);
2823         /* Complete osc stuff. */
2824         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2825                               aa->oa_flags, aa->oa_speculative, rc);
2826
2827         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2828
2829         ldlm_lock_decref(lockh, mode);
2830         LDLM_LOCK_PUT(lock);
2831         RETURN(rc);
2832 }
2833
2834 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2835  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2836  * other synchronous requests, however keeping some locks and trying to obtain
2837  * others may take a considerable amount of time in a case of ost failure; and
2838  * when other sync requests do not get released lock from a client, the client
2839  * is evicted from the cluster -- such scenarious make the life difficult, so
2840  * release locks just after they are obtained. */
2841 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2842                      __u64 *flags, union ldlm_policy_data *policy,
2843                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2844                      void *cookie, struct ldlm_enqueue_info *einfo,
2845                      struct ptlrpc_request_set *rqset, int async,
2846                      bool speculative)
2847 {
2848         struct obd_device *obd = exp->exp_obd;
2849         struct lustre_handle lockh = { 0 };
2850         struct ptlrpc_request *req = NULL;
2851         int intent = *flags & LDLM_FL_HAS_INTENT;
2852         __u64 match_flags = *flags;
2853         enum ldlm_mode mode;
2854         int rc;
2855         ENTRY;
2856
2857         /* Filesystem lock extents are extended to page boundaries so that
2858          * dealing with the page cache is a little smoother.  */
2859         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2860         policy->l_extent.end |= ~PAGE_MASK;
2861
2862         /* Next, search for already existing extent locks that will cover us */
2863         /* If we're trying to read, we also search for an existing PW lock.  The
2864          * VFS and page cache already protect us locally, so lots of readers/
2865          * writers can share a single PW lock.
2866          *
2867          * There are problems with conversion deadlocks, so instead of
2868          * converting a read lock to a write lock, we'll just enqueue a new
2869          * one.
2870          *
2871          * At some point we should cancel the read lock instead of making them
2872          * send us a blocking callback, but there are problems with canceling
2873          * locks out from other users right now, too. */
2874         mode = einfo->ei_mode;
2875         if (einfo->ei_mode == LCK_PR)
2876                 mode |= LCK_PW;
2877         /* Normal lock requests must wait for the LVB to be ready before
2878          * matching a lock; speculative lock requests do not need to,
2879          * because they will not actually use the lock. */
2880         if (!speculative)
2881                 match_flags |= LDLM_FL_LVB_READY;
2882         if (intent != 0)
2883                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2884         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2885                                einfo->ei_type, policy, mode, &lockh);
2886         if (mode) {
2887                 struct ldlm_lock *matched;
2888
2889                 if (*flags & LDLM_FL_TEST_LOCK)
2890                         RETURN(ELDLM_OK);
2891
2892                 matched = ldlm_handle2lock(&lockh);
2893                 if (speculative) {
2894                         /* This DLM lock request is speculative, and does not
2895                          * have an associated IO request. Therefore if there
2896                          * is already a DLM lock, it wll just inform the
2897                          * caller to cancel the request for this stripe.*/
2898                         lock_res_and_lock(matched);
2899                         if (ldlm_extent_equal(&policy->l_extent,
2900                             &matched->l_policy_data.l_extent))
2901                                 rc = -EEXIST;
2902                         else
2903                                 rc = -ECANCELED;
2904                         unlock_res_and_lock(matched);
2905
2906                         ldlm_lock_decref(&lockh, mode);
2907                         LDLM_LOCK_PUT(matched);
2908                         RETURN(rc);
2909                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2910                         *flags |= LDLM_FL_LVB_READY;
2911
2912                         /* We already have a lock, and it's referenced. */
2913                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2914
2915                         ldlm_lock_decref(&lockh, mode);
2916                         LDLM_LOCK_PUT(matched);
2917                         RETURN(ELDLM_OK);
2918                 } else {
2919                         ldlm_lock_decref(&lockh, mode);
2920                         LDLM_LOCK_PUT(matched);
2921                 }
2922         }
2923
2924         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2925                 RETURN(-ENOLCK);
2926
2927         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2928         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2929
2930         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2931                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2932         if (async) {
2933                 if (!rc) {
2934                         struct osc_enqueue_args *aa;
2935                         aa = ptlrpc_req_async_args(aa, req);
2936                         aa->oa_exp         = exp;
2937                         aa->oa_mode        = einfo->ei_mode;
2938                         aa->oa_type        = einfo->ei_type;
2939                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2940                         aa->oa_upcall      = upcall;
2941                         aa->oa_cookie      = cookie;
2942                         aa->oa_speculative = speculative;
2943                         if (!speculative) {
2944                                 aa->oa_flags  = flags;
2945                                 aa->oa_lvb    = lvb;
2946                         } else {
2947                                 /* speculative locks are essentially to enqueue
2948                                  * a DLM lock  in advance, so we don't care
2949                                  * about the result of the enqueue. */
2950                                 aa->oa_lvb    = NULL;
2951                                 aa->oa_flags  = NULL;
2952                         }
2953
2954                         req->rq_interpret_reply = osc_enqueue_interpret;
2955                         ptlrpc_set_add_req(rqset, req);
2956                 }
2957                 RETURN(rc);
2958         }
2959
2960         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2961                               flags, speculative, rc);
2962
2963         RETURN(rc);
2964 }
2965
2966 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2967                    struct ldlm_res_id *res_id, enum ldlm_type type,
2968                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2969                    __u64 *flags, struct osc_object *obj,
2970                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2971 {
2972         struct obd_device *obd = exp->exp_obd;
2973         __u64 lflags = *flags;
2974         enum ldlm_mode rc;
2975         ENTRY;
2976
2977         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2978                 RETURN(-EIO);
2979
2980         /* Filesystem lock extents are extended to page boundaries so that
2981          * dealing with the page cache is a little smoother */
2982         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2983         policy->l_extent.end |= ~PAGE_MASK;
2984
2985         /* Next, search for already existing extent locks that will cover us */
2986         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2987                                         res_id, type, policy, mode, lockh,
2988                                         match_flags);
2989         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2990                 RETURN(rc);
2991
2992         if (obj != NULL) {
2993                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2994
2995                 LASSERT(lock != NULL);
2996                 if (osc_set_lock_data(lock, obj)) {
2997                         lock_res_and_lock(lock);
2998                         if (!ldlm_is_lvb_cached(lock)) {
2999                                 LASSERT(lock->l_ast_data == obj);
3000                                 osc_lock_lvb_update(env, obj, lock, NULL);
3001                                 ldlm_set_lvb_cached(lock);
3002                         }
3003                         unlock_res_and_lock(lock);
3004                 } else {
3005                         ldlm_lock_decref(lockh, rc);
3006                         rc = 0;
3007                 }
3008                 LDLM_LOCK_PUT(lock);
3009         }
3010         RETURN(rc);
3011 }
3012
3013 static int osc_statfs_interpret(const struct lu_env *env,
3014                                 struct ptlrpc_request *req, void *args, int rc)
3015 {
3016         struct osc_async_args *aa = args;
3017         struct obd_statfs *msfs;
3018
3019         ENTRY;
3020         if (rc == -EBADR)
3021                 /*
3022                  * The request has in fact never been sent due to issues at
3023                  * a higher level (LOV).  Exit immediately since the caller
3024                  * is aware of the problem and takes care of the clean up.
3025                  */
3026                 RETURN(rc);
3027
3028         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3029             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3030                 GOTO(out, rc = 0);
3031
3032         if (rc != 0)
3033                 GOTO(out, rc);
3034
3035         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3036         if (msfs == NULL)
3037                 GOTO(out, rc = -EPROTO);
3038
3039         *aa->aa_oi->oi_osfs = *msfs;
3040 out:
3041         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3042
3043         RETURN(rc);
3044 }
3045
3046 static int osc_statfs_async(struct obd_export *exp,
3047                             struct obd_info *oinfo, time64_t max_age,
3048                             struct ptlrpc_request_set *rqset)
3049 {
3050         struct obd_device     *obd = class_exp2obd(exp);
3051         struct ptlrpc_request *req;
3052         struct osc_async_args *aa;
3053         int rc;
3054         ENTRY;
3055
3056         if (obd->obd_osfs_age >= max_age) {
3057                 CDEBUG(D_SUPER,
3058                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3059                        obd->obd_name, &obd->obd_osfs,
3060                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3061                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3062                 spin_lock(&obd->obd_osfs_lock);
3063                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3064                 spin_unlock(&obd->obd_osfs_lock);
3065                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3066                 if (oinfo->oi_cb_up)
3067                         oinfo->oi_cb_up(oinfo, 0);
3068
3069                 RETURN(0);
3070         }
3071
3072         /* We could possibly pass max_age in the request (as an absolute
3073          * timestamp or a "seconds.usec ago") so the target can avoid doing
3074          * extra calls into the filesystem if that isn't necessary (e.g.
3075          * during mount that would help a bit).  Having relative timestamps
3076          * is not so great if request processing is slow, while absolute
3077          * timestamps are not ideal because they need time synchronization. */
3078         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3079         if (req == NULL)
3080                 RETURN(-ENOMEM);
3081
3082         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3083         if (rc) {
3084                 ptlrpc_request_free(req);
3085                 RETURN(rc);
3086         }
3087         ptlrpc_request_set_replen(req);
3088         req->rq_request_portal = OST_CREATE_PORTAL;
3089         ptlrpc_at_set_req_timeout(req);
3090
3091         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3092                 /* procfs requests not want stat in wait for avoid deadlock */
3093                 req->rq_no_resend = 1;
3094                 req->rq_no_delay = 1;
3095         }
3096
3097         req->rq_interpret_reply = osc_statfs_interpret;
3098         aa = ptlrpc_req_async_args(aa, req);
3099         aa->aa_oi = oinfo;
3100
3101         ptlrpc_set_add_req(rqset, req);
3102         RETURN(0);
3103 }
3104
3105 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3106                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3107 {
3108         struct obd_device     *obd = class_exp2obd(exp);
3109         struct obd_statfs     *msfs;
3110         struct ptlrpc_request *req;
3111         struct obd_import     *imp, *imp0;
3112         int rc;
3113         ENTRY;
3114
3115         /*Since the request might also come from lprocfs, so we need
3116          *sync this with client_disconnect_export Bug15684
3117          */
3118         with_imp_locked(obd, imp0, rc)
3119                 imp = class_import_get(imp0);
3120         if (rc)
3121                 RETURN(rc);
3122
3123         /* We could possibly pass max_age in the request (as an absolute
3124          * timestamp or a "seconds.usec ago") so the target can avoid doing
3125          * extra calls into the filesystem if that isn't necessary (e.g.
3126          * during mount that would help a bit).  Having relative timestamps
3127          * is not so great if request processing is slow, while absolute
3128          * timestamps are not ideal because they need time synchronization. */
3129         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3130
3131         class_import_put(imp);
3132
3133         if (req == NULL)
3134                 RETURN(-ENOMEM);
3135
3136         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3137         if (rc) {
3138                 ptlrpc_request_free(req);
3139                 RETURN(rc);
3140         }
3141         ptlrpc_request_set_replen(req);
3142         req->rq_request_portal = OST_CREATE_PORTAL;
3143         ptlrpc_at_set_req_timeout(req);
3144
3145         if (flags & OBD_STATFS_NODELAY) {
3146                 /* procfs requests not want stat in wait for avoid deadlock */
3147                 req->rq_no_resend = 1;
3148                 req->rq_no_delay = 1;
3149         }
3150
3151         rc = ptlrpc_queue_wait(req);
3152         if (rc)
3153                 GOTO(out, rc);
3154
3155         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3156         if (msfs == NULL)
3157                 GOTO(out, rc = -EPROTO);
3158
3159         *osfs = *msfs;
3160
3161         EXIT;
3162 out:
3163         ptlrpc_req_finished(req);
3164         return rc;
3165 }
3166
3167 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3168                          void *karg, void __user *uarg)
3169 {
3170         struct obd_device *obd = exp->exp_obd;
3171         struct obd_ioctl_data *data = karg;
3172         int rc = 0;
3173
3174         ENTRY;
3175         if (!try_module_get(THIS_MODULE)) {
3176                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3177                        module_name(THIS_MODULE));
3178                 return -EINVAL;
3179         }
3180         switch (cmd) {
3181         case OBD_IOC_CLIENT_RECOVER:
3182                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3183                                            data->ioc_inlbuf1, 0);
3184                 if (rc > 0)
3185                         rc = 0;
3186                 break;
3187         case OBD_IOC_GETATTR:
3188                 rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
3189                 break;
3190         case IOC_OSC_SET_ACTIVE:
3191                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3192                                               data->ioc_offset);
3193                 break;
3194         default:
3195                 rc = -ENOTTY;
3196                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3197                        obd->obd_name, cmd, current->comm, rc);
3198                 break;
3199         }
3200
3201         module_put(THIS_MODULE);
3202         return rc;
3203 }
3204
3205 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3206                        u32 keylen, void *key, u32 vallen, void *val,
3207                        struct ptlrpc_request_set *set)
3208 {
3209         struct ptlrpc_request *req;
3210         struct obd_device     *obd = exp->exp_obd;
3211         struct obd_import     *imp = class_exp2cliimp(exp);
3212         char                  *tmp;
3213         int                    rc;
3214         ENTRY;
3215
3216         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3217
3218         if (KEY_IS(KEY_CHECKSUM)) {
3219                 if (vallen != sizeof(int))
3220                         RETURN(-EINVAL);
3221                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3222                 RETURN(0);
3223         }
3224
3225         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3226                 sptlrpc_conf_client_adapt(obd);
3227                 RETURN(0);
3228         }
3229
3230         if (KEY_IS(KEY_FLUSH_CTX)) {
3231                 sptlrpc_import_flush_my_ctx(imp);
3232                 RETURN(0);
3233         }
3234
3235         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3236                 struct client_obd *cli = &obd->u.cli;
3237                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3238                 long target = *(long *)val;
3239
3240                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3241                 *(long *)val -= nr;
3242                 RETURN(0);
3243         }
3244
3245         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3246                 RETURN(-EINVAL);
3247
3248         /* We pass all other commands directly to OST. Since nobody calls osc
3249            methods directly and everybody is supposed to go through LOV, we
3250            assume lov checked invalid values for us.
3251            The only recognised values so far are evict_by_nid and mds_conn.
3252            Even if something bad goes through, we'd get a -EINVAL from OST
3253            anyway. */
3254
3255         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3256                                                 &RQF_OST_SET_GRANT_INFO :
3257                                                 &RQF_OBD_SET_INFO);
3258         if (req == NULL)
3259                 RETURN(-ENOMEM);
3260
3261         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3262                              RCL_CLIENT, keylen);
3263         if (!KEY_IS(KEY_GRANT_SHRINK))
3264                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3265                                      RCL_CLIENT, vallen);
3266         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3267         if (rc) {
3268                 ptlrpc_request_free(req);
3269                 RETURN(rc);
3270         }
3271
3272         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3273         memcpy(tmp, key, keylen);
3274         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3275                                                         &RMF_OST_BODY :
3276                                                         &RMF_SETINFO_VAL);
3277         memcpy(tmp, val, vallen);
3278
3279         if (KEY_IS(KEY_GRANT_SHRINK)) {
3280                 struct osc_grant_args *aa;
3281                 struct obdo *oa;
3282
3283                 aa = ptlrpc_req_async_args(aa, req);
3284                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3285                 if (!oa) {
3286                         ptlrpc_req_finished(req);
3287                         RETURN(-ENOMEM);
3288                 }
3289                 *oa = ((struct ost_body *)val)->oa;
3290                 aa->aa_oa = oa;
3291                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3292         }
3293
3294         ptlrpc_request_set_replen(req);
3295         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3296                 LASSERT(set != NULL);
3297                 ptlrpc_set_add_req(set, req);
3298                 ptlrpc_check_set(NULL, set);
3299         } else {
3300                 ptlrpcd_add_req(req);
3301         }
3302
3303         RETURN(0);
3304 }
3305 EXPORT_SYMBOL(osc_set_info_async);
3306
3307 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3308                   struct obd_device *obd, struct obd_uuid *cluuid,
3309                   struct obd_connect_data *data, void *localdata)
3310 {
3311         struct client_obd *cli = &obd->u.cli;
3312
3313         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3314                 long lost_grant;
3315                 long grant;
3316
3317                 spin_lock(&cli->cl_loi_list_lock);
3318                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3319                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3320                         /* restore ocd_grant_blkbits as client page bits */
3321                         data->ocd_grant_blkbits = PAGE_SHIFT;
3322                         grant += cli->cl_dirty_grant;
3323                 } else {
3324                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3325                 }
3326                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3327                 lost_grant = cli->cl_lost_grant;
3328                 cli->cl_lost_grant = 0;
3329                 spin_unlock(&cli->cl_loi_list_lock);
3330
3331                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3332                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3333                        data->ocd_version, data->ocd_grant, lost_grant);
3334         }
3335
3336         RETURN(0);
3337 }
3338 EXPORT_SYMBOL(osc_reconnect);
3339
3340 int osc_disconnect(struct obd_export *exp)
3341 {
3342         struct obd_device *obd = class_exp2obd(exp);
3343         int rc;
3344
3345         rc = client_disconnect_export(exp);
3346         /**
3347          * Initially we put del_shrink_grant before disconnect_export, but it
3348          * causes the following problem if setup (connect) and cleanup
3349          * (disconnect) are tangled together.
3350          *      connect p1                     disconnect p2
3351          *   ptlrpc_connect_import
3352          *     ...............               class_manual_cleanup
3353          *                                     osc_disconnect
3354          *                                     del_shrink_grant
3355          *   ptlrpc_connect_interrupt
3356          *     osc_init_grant
3357          *   add this client to shrink list
3358          *                                      cleanup_osc
3359          * Bang! grant shrink thread trigger the shrink. BUG18662
3360          */
3361         osc_del_grant_list(&obd->u.cli);
3362         return rc;
3363 }
3364 EXPORT_SYMBOL(osc_disconnect);
3365
3366 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3367                                  struct hlist_node *hnode, void *arg)
3368 {
3369         struct lu_env *env = arg;
3370         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3371         struct ldlm_lock *lock;
3372         struct osc_object *osc = NULL;
3373         ENTRY;
3374
3375         lock_res(res);
3376         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3377                 if (lock->l_ast_data != NULL && osc == NULL) {
3378                         osc = lock->l_ast_data;
3379                         cl_object_get(osc2cl(osc));
3380                 }
3381
3382                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3383                  * by the 2nd round of ldlm_namespace_clean() call in
3384                  * osc_import_event(). */
3385                 ldlm_clear_cleaned(lock);
3386         }
3387         unlock_res(res);
3388
3389         if (osc != NULL) {
3390                 osc_object_invalidate(env, osc);
3391                 cl_object_put(env, osc2cl(osc));
3392         }
3393
3394         RETURN(0);
3395 }
3396 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3397
3398 static int osc_import_event(struct obd_device *obd,
3399                             struct obd_import *imp,
3400                             enum obd_import_event event)
3401 {
3402         struct client_obd *cli;
3403         int rc = 0;
3404
3405         ENTRY;
3406         LASSERT(imp->imp_obd == obd);
3407
3408         switch (event) {
3409         case IMP_EVENT_DISCON: {
3410                 cli = &obd->u.cli;
3411                 spin_lock(&cli->cl_loi_list_lock);
3412                 cli->cl_avail_grant = 0;
3413                 cli->cl_lost_grant = 0;
3414                 spin_unlock(&cli->cl_loi_list_lock);
3415                 break;
3416         }
3417         case IMP_EVENT_INACTIVE: {
3418                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3419                 break;
3420         }
3421         case IMP_EVENT_INVALIDATE: {
3422                 struct ldlm_namespace *ns = obd->obd_namespace;
3423                 struct lu_env         *env;
3424                 __u16                  refcheck;
3425
3426                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3427
3428                 env = cl_env_get(&refcheck);
3429                 if (!IS_ERR(env)) {
3430                         osc_io_unplug(env, &obd->u.cli, NULL);
3431
3432                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3433                                                  osc_ldlm_resource_invalidate,
3434                                                  env, 0);
3435                         cl_env_put(env, &refcheck);
3436
3437                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3438                 } else
3439                         rc = PTR_ERR(env);
3440                 break;
3441         }
3442         case IMP_EVENT_ACTIVE: {
3443                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3444                 break;
3445         }
3446         case IMP_EVENT_OCD: {
3447                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3448
3449                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3450                         osc_init_grant(&obd->u.cli, ocd);
3451
3452                 /* See bug 7198 */
3453                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3454                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3455
3456                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3457                 break;
3458         }
3459         case IMP_EVENT_DEACTIVATE: {
3460                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3461                 break;
3462         }
3463         case IMP_EVENT_ACTIVATE: {
3464                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3465                 break;
3466         }
3467         default:
3468                 CERROR("Unknown import event %d\n", event);
3469                 LBUG();
3470         }
3471         RETURN(rc);
3472 }
3473
3474 /**
3475  * Determine whether the lock can be canceled before replaying the lock
3476  * during recovery, see bug16774 for detailed information.
3477  *
3478  * \retval zero the lock can't be canceled
3479  * \retval other ok to cancel
3480  */
3481 static int osc_cancel_weight(struct ldlm_lock *lock)
3482 {
3483         /*
3484          * Cancel all unused and granted extent lock.
3485          */
3486         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3487             ldlm_is_granted(lock) &&
3488             osc_ldlm_weigh_ast(lock) == 0)
3489                 RETURN(1);
3490
3491         RETURN(0);
3492 }
3493
3494 static int brw_queue_work(const struct lu_env *env, void *data)
3495 {
3496         struct client_obd *cli = data;
3497
3498         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3499
3500         osc_io_unplug(env, cli, NULL);
3501         RETURN(0);
3502 }
3503
3504 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3505 {
3506         struct client_obd *cli = &obd->u.cli;
3507         void *handler;
3508         int rc;
3509
3510         ENTRY;
3511
3512         rc = ptlrpcd_addref();
3513         if (rc)
3514                 RETURN(rc);
3515
3516         rc = client_obd_setup(obd, lcfg);
3517         if (rc)
3518                 GOTO(out_ptlrpcd, rc);
3519
3520
3521         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3522         if (IS_ERR(handler))
3523                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3524         cli->cl_writeback_work = handler;
3525
3526         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3527         if (IS_ERR(handler))
3528                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3529         cli->cl_lru_work = handler;
3530
3531         rc = osc_quota_setup(obd);
3532         if (rc)
3533                 GOTO(out_ptlrpcd_work, rc);
3534
3535         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3536         cli->cl_root_squash = 0;
3537         osc_update_next_shrink(cli);
3538
3539         RETURN(rc);
3540
3541 out_ptlrpcd_work:
3542         if (cli->cl_writeback_work != NULL) {
3543                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3544                 cli->cl_writeback_work = NULL;
3545         }
3546         if (cli->cl_lru_work != NULL) {
3547                 ptlrpcd_destroy_work(cli->cl_lru_work);
3548                 cli->cl_lru_work = NULL;
3549         }
3550         client_obd_cleanup(obd);
3551 out_ptlrpcd:
3552         ptlrpcd_decref();
3553         RETURN(rc);
3554 }
3555 EXPORT_SYMBOL(osc_setup_common);
3556
3557 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3558 {
3559         struct client_obd *cli = &obd->u.cli;
3560         int                adding;
3561         int                added;
3562         int                req_count;
3563         int                rc;
3564
3565         ENTRY;
3566
3567         rc = osc_setup_common(obd, lcfg);
3568         if (rc < 0)
3569                 RETURN(rc);
3570
3571         rc = osc_tunables_init(obd);
3572         if (rc)
3573                 RETURN(rc);
3574
3575         /*
3576          * We try to control the total number of requests with a upper limit
3577          * osc_reqpool_maxreqcount. There might be some race which will cause
3578          * over-limit allocation, but it is fine.
3579          */
3580         req_count = atomic_read(&osc_pool_req_count);
3581         if (req_count < osc_reqpool_maxreqcount) {
3582                 adding = cli->cl_max_rpcs_in_flight + 2;
3583                 if (req_count + adding > osc_reqpool_maxreqcount)
3584                         adding = osc_reqpool_maxreqcount - req_count;
3585
3586                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3587                 atomic_add(added, &osc_pool_req_count);
3588         }
3589
3590         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3591
3592         spin_lock(&osc_shrink_lock);
3593         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3594         spin_unlock(&osc_shrink_lock);
3595         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3596         cli->cl_import->imp_idle_debug = D_HA;
3597
3598         RETURN(0);
3599 }
3600
3601 int osc_precleanup_common(struct obd_device *obd)
3602 {
3603         struct client_obd *cli = &obd->u.cli;
3604         ENTRY;
3605
3606         /* LU-464
3607          * for echo client, export may be on zombie list, wait for
3608          * zombie thread to cull it, because cli.cl_import will be
3609          * cleared in client_disconnect_export():
3610          *   class_export_destroy() -> obd_cleanup() ->
3611          *   echo_device_free() -> echo_client_cleanup() ->
3612          *   obd_disconnect() -> osc_disconnect() ->
3613          *   client_disconnect_export()
3614          */
3615         obd_zombie_barrier();
3616         if (cli->cl_writeback_work) {
3617                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3618                 cli->cl_writeback_work = NULL;
3619         }
3620
3621         if (cli->cl_lru_work) {
3622                 ptlrpcd_destroy_work(cli->cl_lru_work);
3623                 cli->cl_lru_work = NULL;
3624         }
3625
3626         obd_cleanup_client_import(obd);
3627         RETURN(0);
3628 }
3629 EXPORT_SYMBOL(osc_precleanup_common);
3630
3631 static int osc_precleanup(struct obd_device *obd)
3632 {
3633         ENTRY;
3634
3635         osc_precleanup_common(obd);
3636
3637         ptlrpc_lprocfs_unregister_obd(obd);
3638         RETURN(0);
3639 }
3640
3641 int osc_cleanup_common(struct obd_device *obd)
3642 {
3643         struct client_obd *cli = &obd->u.cli;
3644         int rc;
3645
3646         ENTRY;
3647
3648         spin_lock(&osc_shrink_lock);
3649         list_del(&cli->cl_shrink_list);
3650         spin_unlock(&osc_shrink_lock);
3651
3652         /* lru cleanup */
3653         if (cli->cl_cache != NULL) {
3654                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3655                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3656                 list_del_init(&cli->cl_lru_osc);
3657                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3658                 cli->cl_lru_left = NULL;
3659                 cl_cache_decref(cli->cl_cache);
3660                 cli->cl_cache = NULL;
3661         }
3662
3663         /* free memory of osc quota cache */
3664         osc_quota_cleanup(obd);
3665
3666         rc = client_obd_cleanup(obd);
3667
3668         ptlrpcd_decref();
3669         RETURN(rc);
3670 }
3671 EXPORT_SYMBOL(osc_cleanup_common);
3672
3673 static const struct obd_ops osc_obd_ops = {
3674         .o_owner                = THIS_MODULE,
3675         .o_setup                = osc_setup,
3676         .o_precleanup           = osc_precleanup,
3677         .o_cleanup              = osc_cleanup_common,
3678         .o_add_conn             = client_import_add_conn,
3679         .o_del_conn             = client_import_del_conn,
3680         .o_connect              = client_connect_import,
3681         .o_reconnect            = osc_reconnect,
3682         .o_disconnect           = osc_disconnect,
3683         .o_statfs               = osc_statfs,
3684         .o_statfs_async         = osc_statfs_async,
3685         .o_create               = osc_create,
3686         .o_destroy              = osc_destroy,
3687         .o_getattr              = osc_getattr,
3688         .o_setattr              = osc_setattr,
3689         .o_iocontrol            = osc_iocontrol,
3690         .o_set_info_async       = osc_set_info_async,
3691         .o_import_event         = osc_import_event,
3692         .o_quotactl             = osc_quotactl,
3693 };
3694
3695 LIST_HEAD(osc_shrink_list);
3696 DEFINE_SPINLOCK(osc_shrink_lock);
3697
3698 #ifdef HAVE_SHRINKER_COUNT
3699 static struct shrinker osc_cache_shrinker = {
3700         .count_objects  = osc_cache_shrink_count,
3701         .scan_objects   = osc_cache_shrink_scan,
3702         .seeks          = DEFAULT_SEEKS,
3703 };
3704 #else
3705 static int osc_cache_shrink(struct shrinker *shrinker,
3706                             struct shrink_control *sc)
3707 {
3708         (void)osc_cache_shrink_scan(shrinker, sc);
3709
3710         return osc_cache_shrink_count(shrinker, sc);
3711 }
3712
3713 static struct shrinker osc_cache_shrinker = {
3714         .shrink   = osc_cache_shrink,
3715         .seeks    = DEFAULT_SEEKS,
3716 };
3717 #endif
3718
3719 static int __init osc_init(void)
3720 {
3721         unsigned int reqpool_size;
3722         unsigned int reqsize;
3723         int rc;
3724         ENTRY;
3725
3726         /* print an address of _any_ initialized kernel symbol from this
3727          * module, to allow debugging with gdb that doesn't support data
3728          * symbols from modules.*/
3729         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3730
3731         rc = lu_kmem_init(osc_caches);
3732         if (rc)
3733                 RETURN(rc);
3734
3735         rc = class_register_type(&osc_obd_ops, NULL, true,
3736                                  LUSTRE_OSC_NAME, &osc_device_type);
3737         if (rc)
3738                 GOTO(out_kmem, rc);
3739
3740         rc = register_shrinker(&osc_cache_shrinker);
3741         if (rc)
3742                 GOTO(out_type, rc);
3743
3744         /* This is obviously too much memory, only prevent overflow here */
3745         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3746                 GOTO(out_shrinker, rc = -EINVAL);
3747
3748         reqpool_size = osc_reqpool_mem_max << 20;
3749
3750         reqsize = 1;
3751         while (reqsize < OST_IO_MAXREQSIZE)
3752                 reqsize = reqsize << 1;
3753
3754         /*
3755          * We don't enlarge the request count in OSC pool according to
3756          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3757          * tried after normal allocation failed. So a small OSC pool won't
3758          * cause much performance degression in most of cases.
3759          */
3760         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3761
3762         atomic_set(&osc_pool_req_count, 0);
3763         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3764                                           ptlrpc_add_rqs_to_pool);
3765
3766         if (osc_rq_pool == NULL)
3767                 GOTO(out_shrinker, rc = -ENOMEM);
3768
3769         rc = osc_start_grant_work();
3770         if (rc != 0)
3771                 GOTO(out_req_pool, rc);
3772
3773         RETURN(rc);
3774
3775 out_req_pool:
3776         ptlrpc_free_rq_pool(osc_rq_pool);
3777 out_shrinker:
3778         unregister_shrinker(&osc_cache_shrinker);
3779 out_type:
3780         class_unregister_type(LUSTRE_OSC_NAME);
3781 out_kmem:
3782         lu_kmem_fini(osc_caches);
3783
3784         RETURN(rc);
3785 }
3786
3787 static void __exit osc_exit(void)
3788 {
3789         osc_stop_grant_work();
3790         unregister_shrinker(&osc_cache_shrinker);
3791         class_unregister_type(LUSTRE_OSC_NAME);
3792         lu_kmem_fini(osc_caches);
3793         ptlrpc_free_rq_pool(osc_rq_pool);
3794 }
3795
3796 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3797 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3798 MODULE_VERSION(LUSTRE_VERSION_STRING);
3799 MODULE_LICENSE("GPL");
3800
3801 module_init(osc_init);
3802 module_exit(osc_exit);