Whamcloud - gitweb
LU-14160 fallocate: Add punch mode to fallocate
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49 #include <linux/falloc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req, void *args, int rc)
485 {
486         struct osc_fsync_args *fa = args;
487         struct ost_body *body;
488         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
489         unsigned long valid = 0;
490         struct cl_object *obj;
491         ENTRY;
492
493         if (rc != 0)
494                 GOTO(out, rc);
495
496         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
497         if (body == NULL) {
498                 CERROR("can't unpack ost_body\n");
499                 GOTO(out, rc = -EPROTO);
500         }
501
502         *fa->fa_oa = body->oa;
503         obj = osc2cl(fa->fa_obj);
504
505         /* Update osc object's blocks attribute */
506         cl_object_attr_lock(obj);
507         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
508                 attr->cat_blocks = body->oa.o_blocks;
509                 valid |= CAT_BLOCKS;
510         }
511
512         if (valid != 0)
513                 cl_object_attr_update(env, obj, attr, valid);
514         cl_object_attr_unlock(obj);
515
516 out:
517         rc = fa->fa_upcall(fa->fa_cookie, rc);
518         RETURN(rc);
519 }
520
521 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
522                   obd_enqueue_update_f upcall, void *cookie,
523                   struct ptlrpc_request_set *rqset)
524 {
525         struct obd_export     *exp = osc_export(obj);
526         struct ptlrpc_request *req;
527         struct ost_body       *body;
528         struct osc_fsync_args *fa;
529         int                    rc;
530         ENTRY;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 RETURN(rc);
540         }
541
542         /* overload the size and blocks fields in the oa with start/end */
543         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544         LASSERT(body);
545         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
546
547         ptlrpc_request_set_replen(req);
548         req->rq_interpret_reply = osc_sync_interpret;
549
550         fa = ptlrpc_req_async_args(fa, req);
551         fa->fa_obj = obj;
552         fa->fa_oa = oa;
553         fa->fa_upcall = upcall;
554         fa->fa_cookie = cookie;
555
556         ptlrpc_set_add_req(rqset, req);
557
558         RETURN (0);
559 }
560
561 /* Find and cancel locally locks matched by @mode in the resource found by
562  * @objid. Found locks are added into @cancel list. Returns the amount of
563  * locks added to @cancels list. */
564 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
565                                    struct list_head *cancels,
566                                    enum ldlm_mode mode, __u64 lock_flags)
567 {
568         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
569         struct ldlm_res_id res_id;
570         struct ldlm_resource *res;
571         int count;
572         ENTRY;
573
574         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
575          * export) but disabled through procfs (flag in NS).
576          *
577          * This distinguishes from a case when ELC is not supported originally,
578          * when we still want to cancel locks in advance and just cancel them
579          * locally, without sending any RPC. */
580         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
581                 RETURN(0);
582
583         ostid_build_res_name(&oa->o_oi, &res_id);
584         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         if (IS_ERR(res))
586                 RETURN(0);
587
588         LDLM_RESOURCE_ADDREF(res);
589         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
590                                            lock_flags, 0, NULL);
591         LDLM_RESOURCE_DELREF(res);
592         ldlm_resource_putref(res);
593         RETURN(count);
594 }
595
596 static int osc_destroy_interpret(const struct lu_env *env,
597                                  struct ptlrpc_request *req, void *args, int rc)
598 {
599         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
600
601         atomic_dec(&cli->cl_destroy_in_flight);
602         wake_up(&cli->cl_destroy_waitq);
603
604         return 0;
605 }
606
607 static int osc_can_send_destroy(struct client_obd *cli)
608 {
609         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
610             cli->cl_max_rpcs_in_flight) {
611                 /* The destroy request can be sent */
612                 return 1;
613         }
614         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
615             cli->cl_max_rpcs_in_flight) {
616                 /*
617                  * The counter has been modified between the two atomic
618                  * operations.
619                  */
620                 wake_up(&cli->cl_destroy_waitq);
621         }
622         return 0;
623 }
624
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         LIST_HEAD(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
650                                0, &cancels, count);
651         if (rc) {
652                 ptlrpc_request_free(req);
653                 RETURN(rc);
654         }
655
656         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
657         ptlrpc_at_set_req_timeout(req);
658
659         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
660         LASSERT(body);
661         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
662
663         ptlrpc_request_set_replen(req);
664
665         req->rq_interpret_reply = osc_destroy_interpret;
666         if (!osc_can_send_destroy(cli)) {
667                 /*
668                  * Wait until the number of on-going destroy RPCs drops
669                  * under max_rpc_in_flight
670                  */
671                 rc = l_wait_event_abortable_exclusive(
672                         cli->cl_destroy_waitq,
673                         osc_can_send_destroy(cli));
674                 if (rc) {
675                         ptlrpc_req_finished(req);
676                         RETURN(-EINTR);
677                 }
678         }
679
680         /* Do not wait for response */
681         ptlrpcd_add_req(req);
682         RETURN(0);
683 }
684
685 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
686                                 long writing_bytes)
687 {
688         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
689
690         LASSERT(!(oa->o_valid & bits));
691
692         oa->o_valid |= bits;
693         spin_lock(&cli->cl_loi_list_lock);
694         if (cli->cl_ocd_grant_param)
695                 oa->o_dirty = cli->cl_dirty_grant;
696         else
697                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
698         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
699                 CERROR("dirty %lu > dirty_max %lu\n",
700                        cli->cl_dirty_pages,
701                        cli->cl_dirty_max_pages);
702                 oa->o_undirty = 0;
703         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
704                             (long)(obd_max_dirty_pages + 1))) {
705                 /* The atomic_read() allowing the atomic_inc() are
706                  * not covered by a lock thus they may safely race and trip
707                  * this CERROR() unless we add in a small fudge factor (+1). */
708                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
709                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
710                        obd_max_dirty_pages);
711                 oa->o_undirty = 0;
712         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
713                             0x7fffffff)) {
714                 CERROR("dirty %lu - dirty_max %lu too big???\n",
715                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
716                 oa->o_undirty = 0;
717         } else {
718                 unsigned long nrpages;
719                 unsigned long undirty;
720
721                 nrpages = cli->cl_max_pages_per_rpc;
722                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
723                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
724                 undirty = nrpages << PAGE_SHIFT;
725                 if (cli->cl_ocd_grant_param) {
726                         int nrextents;
727
728                         /* take extent tax into account when asking for more
729                          * grant space */
730                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
731                                      cli->cl_max_extent_pages;
732                         undirty += nrextents * cli->cl_grant_extent_tax;
733                 }
734                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
735                  * to add extent tax, etc.
736                  */
737                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
738                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
739         }
740         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
741         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
742         if (cli->cl_lost_grant > INT_MAX) {
743                 CDEBUG(D_CACHE,
744                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
745                       cli_name(cli), cli->cl_lost_grant);
746                 oa->o_dropped = INT_MAX;
747         } else {
748                 oa->o_dropped = cli->cl_lost_grant;
749         }
750         cli->cl_lost_grant -= oa->o_dropped;
751         spin_unlock(&cli->cl_loi_list_lock);
752         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
753                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
754                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 unsigned long consumed = cli->cl_reserved_grant;
1027
1028                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1029                         consumed += cli->cl_dirty_grant;
1030                 else
1031                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1032                 if (cli->cl_avail_grant < consumed) {
1033                         CERROR("%s: granted %ld but already consumed %ld\n",
1034                                cli_name(cli), cli->cl_avail_grant, consumed);
1035                         cli->cl_avail_grant = 0;
1036                 } else {
1037                         cli->cl_avail_grant -= consumed;
1038                 }
1039         }
1040
1041         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1042                 u64 size;
1043                 int chunk_mask;
1044
1045                 /* overhead for each extent insertion */
1046                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1047                 /* determine the appropriate chunk size used by osc_extent. */
1048                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1049                                           ocd->ocd_grant_blkbits);
1050                 /* max_pages_per_rpc must be chunk aligned */
1051                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1052                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1053                                              ~chunk_mask) & chunk_mask;
1054                 /* determine maximum extent size, in #pages */
1055                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1056                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1057                 cli->cl_ocd_grant_param = 1;
1058         } else {
1059                 cli->cl_ocd_grant_param = 0;
1060                 cli->cl_grant_extent_tax = 0;
1061                 cli->cl_chunkbits = PAGE_SHIFT;
1062                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1063         }
1064         spin_unlock(&cli->cl_loi_list_lock);
1065
1066         CDEBUG(D_CACHE,
1067                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1068                cli_name(cli),
1069                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1070                cli->cl_max_extent_pages);
1071
1072         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1073                 osc_add_grant_list(cli);
1074 }
1075 EXPORT_SYMBOL(osc_init_grant);
1076
1077 /* We assume that the reason this OSC got a short read is because it read
1078  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1079  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1080  * this stripe never got written at or beyond this stripe offset yet. */
1081 static void handle_short_read(int nob_read, size_t page_count,
1082                               struct brw_page **pga)
1083 {
1084         char *ptr;
1085         int i = 0;
1086
1087         /* skip bytes read OK */
1088         while (nob_read > 0) {
1089                 LASSERT (page_count > 0);
1090
1091                 if (pga[i]->count > nob_read) {
1092                         /* EOF inside this page */
1093                         ptr = kmap(pga[i]->pg) +
1094                                 (pga[i]->off & ~PAGE_MASK);
1095                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1096                         kunmap(pga[i]->pg);
1097                         page_count--;
1098                         i++;
1099                         break;
1100                 }
1101
1102                 nob_read -= pga[i]->count;
1103                 page_count--;
1104                 i++;
1105         }
1106
1107         /* zero remaining pages */
1108         while (page_count-- > 0) {
1109                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1110                 memset(ptr, 0, pga[i]->count);
1111                 kunmap(pga[i]->pg);
1112                 i++;
1113         }
1114 }
1115
1116 static int check_write_rcs(struct ptlrpc_request *req,
1117                            int requested_nob, int niocount,
1118                            size_t page_count, struct brw_page **pga)
1119 {
1120         int     i;
1121         __u32   *remote_rcs;
1122
1123         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1124                                                   sizeof(*remote_rcs) *
1125                                                   niocount);
1126         if (remote_rcs == NULL) {
1127                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1128                 return(-EPROTO);
1129         }
1130
1131         /* return error if any niobuf was in error */
1132         for (i = 0; i < niocount; i++) {
1133                 if ((int)remote_rcs[i] < 0) {
1134                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1135                                i, remote_rcs[i], req);
1136                         return remote_rcs[i];
1137                 }
1138
1139                 if (remote_rcs[i] != 0) {
1140                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1141                                 i, remote_rcs[i], req);
1142                         return(-EPROTO);
1143                 }
1144         }
1145         if (req->rq_bulk != NULL &&
1146             req->rq_bulk->bd_nob_transferred != requested_nob) {
1147                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1148                        req->rq_bulk->bd_nob_transferred, requested_nob);
1149                 return(-EPROTO);
1150         }
1151
1152         return (0);
1153 }
1154
1155 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1156 {
1157         if (p1->flag != p2->flag) {
1158                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1159                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1160                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1161
1162                 /* warn if we try to combine flags that we don't know to be
1163                  * safe to combine */
1164                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1165                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1166                               "report this at https://jira.whamcloud.com/\n",
1167                               p1->flag, p2->flag);
1168                 }
1169                 return 0;
1170         }
1171
1172         return (p1->off + p1->count == p2->off);
1173 }
1174
1175 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1176 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1177                                    size_t pg_count, struct brw_page **pga,
1178                                    int opc, obd_dif_csum_fn *fn,
1179                                    int sector_size,
1180                                    u32 *check_sum)
1181 {
1182         struct ahash_request *req;
1183         /* Used Adler as the default checksum type on top of DIF tags */
1184         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1185         struct page *__page;
1186         unsigned char *buffer;
1187         __u16 *guard_start;
1188         unsigned int bufsize;
1189         int guard_number;
1190         int used_number = 0;
1191         int used;
1192         u32 cksum;
1193         int rc = 0;
1194         int i = 0;
1195
1196         LASSERT(pg_count > 0);
1197
1198         __page = alloc_page(GFP_KERNEL);
1199         if (__page == NULL)
1200                 return -ENOMEM;
1201
1202         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1203         if (IS_ERR(req)) {
1204                 rc = PTR_ERR(req);
1205                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1206                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1207                 GOTO(out, rc);
1208         }
1209
1210         buffer = kmap(__page);
1211         guard_start = (__u16 *)buffer;
1212         guard_number = PAGE_SIZE / sizeof(*guard_start);
1213         while (nob > 0 && pg_count > 0) {
1214                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216                 /* corrupt the data before we compute the checksum, to
1217                  * simulate an OST->client data error */
1218                 if (unlikely(i == 0 && opc == OST_READ &&
1219                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1220                         unsigned char *ptr = kmap(pga[i]->pg);
1221                         int off = pga[i]->off & ~PAGE_MASK;
1222
1223                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1224                         kunmap(pga[i]->pg);
1225                 }
1226
1227                 /*
1228                  * The left guard number should be able to hold checksums of a
1229                  * whole page
1230                  */
1231                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1232                                                   pga[i]->off & ~PAGE_MASK,
1233                                                   count,
1234                                                   guard_start + used_number,
1235                                                   guard_number - used_number,
1236                                                   &used, sector_size,
1237                                                   fn);
1238                 if (rc)
1239                         break;
1240
1241                 used_number += used;
1242                 if (used_number == guard_number) {
1243                         cfs_crypto_hash_update_page(req, __page, 0,
1244                                 used_number * sizeof(*guard_start));
1245                         used_number = 0;
1246                 }
1247
1248                 nob -= pga[i]->count;
1249                 pg_count--;
1250                 i++;
1251         }
1252         kunmap(__page);
1253         if (rc)
1254                 GOTO(out, rc);
1255
1256         if (used_number != 0)
1257                 cfs_crypto_hash_update_page(req, __page, 0,
1258                         used_number * sizeof(*guard_start));
1259
1260         bufsize = sizeof(cksum);
1261         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1262
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 cksum++;
1267
1268         *check_sum = cksum;
1269 out:
1270         __free_page(__page);
1271         return rc;
1272 }
1273 #else /* !CONFIG_CRC_T10DIF */
1274 #define obd_dif_ip_fn NULL
1275 #define obd_dif_crc_fn NULL
1276 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1277         -EOPNOTSUPP
1278 #endif /* CONFIG_CRC_T10DIF */
1279
1280 static int osc_checksum_bulk(int nob, size_t pg_count,
1281                              struct brw_page **pga, int opc,
1282                              enum cksum_types cksum_type,
1283                              u32 *cksum)
1284 {
1285         int                             i = 0;
1286         struct ahash_request           *req;
1287         unsigned int                    bufsize;
1288         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1289
1290         LASSERT(pg_count > 0);
1291
1292         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1293         if (IS_ERR(req)) {
1294                 CERROR("Unable to initialize checksum hash %s\n",
1295                        cfs_crypto_hash_name(cfs_alg));
1296                 return PTR_ERR(req);
1297         }
1298
1299         while (nob > 0 && pg_count > 0) {
1300                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1301
1302                 /* corrupt the data before we compute the checksum, to
1303                  * simulate an OST->client data error */
1304                 if (i == 0 && opc == OST_READ &&
1305                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1306                         unsigned char *ptr = kmap(pga[i]->pg);
1307                         int off = pga[i]->off & ~PAGE_MASK;
1308
1309                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1310                         kunmap(pga[i]->pg);
1311                 }
1312                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1313                                             pga[i]->off & ~PAGE_MASK,
1314                                             count);
1315                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1316                                (int)(pga[i]->off & ~PAGE_MASK));
1317
1318                 nob -= pga[i]->count;
1319                 pg_count--;
1320                 i++;
1321         }
1322
1323         bufsize = sizeof(*cksum);
1324         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1325
1326         /* For sending we only compute the wrong checksum instead
1327          * of corrupting the data so it is still correct on a redo */
1328         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1329                 (*cksum)++;
1330
1331         return 0;
1332 }
1333
1334 static int osc_checksum_bulk_rw(const char *obd_name,
1335                                 enum cksum_types cksum_type,
1336                                 int nob, size_t pg_count,
1337                                 struct brw_page **pga, int opc,
1338                                 u32 *check_sum)
1339 {
1340         obd_dif_csum_fn *fn = NULL;
1341         int sector_size = 0;
1342         int rc;
1343
1344         ENTRY;
1345         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1346
1347         if (fn)
1348                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1349                                              opc, fn, sector_size, check_sum);
1350         else
1351                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1352                                        check_sum);
1353
1354         RETURN(rc);
1355 }
1356
1357 static inline void osc_release_bounce_pages(struct brw_page **pga,
1358                                             u32 page_count)
1359 {
1360 #ifdef HAVE_LUSTRE_CRYPTO
1361         int i;
1362
1363         for (i = 0; i < page_count; i++) {
1364                 /* Bounce pages allocated by a call to
1365                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1366                  * are identified thanks to the PageChecked flag.
1367                  */
1368                 if (PageChecked(pga[i]->pg))
1369                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1370                 pga[i]->count -= pga[i]->bp_count_diff;
1371                 pga[i]->off += pga[i]->bp_off_diff;
1372         }
1373 #endif
1374 }
1375
1376 static int
1377 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1378                      u32 page_count, struct brw_page **pga,
1379                      struct ptlrpc_request **reqp, int resend)
1380 {
1381         struct ptlrpc_request *req;
1382         struct ptlrpc_bulk_desc *desc;
1383         struct ost_body *body;
1384         struct obd_ioobj *ioobj;
1385         struct niobuf_remote *niobuf;
1386         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1387         struct osc_brw_async_args *aa;
1388         struct req_capsule *pill;
1389         struct brw_page *pg_prev;
1390         void *short_io_buf;
1391         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1392         struct inode *inode;
1393         bool directio = false;
1394
1395         ENTRY;
1396         inode = page2inode(pga[0]->pg);
1397         if (inode == NULL) {
1398                 /* Try to get reference to inode from cl_page if we are
1399                  * dealing with direct IO, as handled pages are not
1400                  * actual page cache pages.
1401                  */
1402                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1403                 struct cl_page *clpage = oap2cl_page(oap);
1404
1405                 inode = clpage->cp_inode;
1406                 if (inode)
1407                         directio = true;
1408         }
1409         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1410                 RETURN(-ENOMEM); /* Recoverable */
1411         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1412                 RETURN(-EINVAL); /* Fatal */
1413
1414         if ((cmd & OBD_BRW_WRITE) != 0) {
1415                 opc = OST_WRITE;
1416                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1417                                                 osc_rq_pool,
1418                                                 &RQF_OST_BRW_WRITE);
1419         } else {
1420                 opc = OST_READ;
1421                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1422         }
1423         if (req == NULL)
1424                 RETURN(-ENOMEM);
1425
1426         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1427                 for (i = 0; i < page_count; i++) {
1428                         struct brw_page *pg = pga[i];
1429                         struct page *data_page = NULL;
1430                         bool retried = false;
1431                         bool lockedbymyself;
1432                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1433                         struct address_space *map_orig = NULL;
1434                         pgoff_t index_orig;
1435
1436 retry_encrypt:
1437                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1438                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1439                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1440                         /* The page can already be locked when we arrive here.
1441                          * This is possible when cl_page_assume/vvp_page_assume
1442                          * is stuck on wait_on_page_writeback with page lock
1443                          * held. In this case there is no risk for the lock to
1444                          * be released while we are doing our encryption
1445                          * processing, because writeback against that page will
1446                          * end in vvp_page_completion_write/cl_page_completion,
1447                          * which means only once the page is fully processed.
1448                          */
1449                         lockedbymyself = trylock_page(pg->pg);
1450                         if (directio) {
1451                                 map_orig = pg->pg->mapping;
1452                                 pg->pg->mapping = inode->i_mapping;
1453                                 index_orig = pg->pg->index;
1454                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1455                         }
1456                         data_page =
1457                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1458                                                                  nunits, 0,
1459                                                                  GFP_NOFS);
1460                         if (directio) {
1461                                 pg->pg->mapping = map_orig;
1462                                 pg->pg->index = index_orig;
1463                         }
1464                         if (lockedbymyself)
1465                                 unlock_page(pg->pg);
1466                         if (IS_ERR(data_page)) {
1467                                 rc = PTR_ERR(data_page);
1468                                 if (rc == -ENOMEM && !retried) {
1469                                         retried = true;
1470                                         rc = 0;
1471                                         goto retry_encrypt;
1472                                 }
1473                                 ptlrpc_request_free(req);
1474                                 RETURN(rc);
1475                         }
1476                         /* Set PageChecked flag on bounce page for
1477                          * disambiguation in osc_release_bounce_pages().
1478                          */
1479                         SetPageChecked(data_page);
1480                         pg->pg = data_page;
1481                         /* there should be no gap in the middle of page array */
1482                         if (i == page_count - 1) {
1483                                 struct osc_async_page *oap = brw_page2oap(pg);
1484
1485                                 oa->o_size = oap->oap_count +
1486                                         oap->oap_obj_off + oap->oap_page_off;
1487                         }
1488                         /* len is forced to nunits, and relative offset to 0
1489                          * so store the old, clear text info
1490                          */
1491                         pg->bp_count_diff = nunits - pg->count;
1492                         pg->count = nunits;
1493                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1494                         pg->off = pg->off & PAGE_MASK;
1495                 }
1496         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1497                 for (i = 0; i < page_count; i++) {
1498                         struct brw_page *pg = pga[i];
1499                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1500
1501                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1502                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1503                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1504                         /* count/off are forced to cover the whole encryption
1505                          * unit size so that all encrypted data is stored on the
1506                          * OST, so adjust bp_{count,off}_diff for the size of
1507                          * the clear text.
1508                          */
1509                         pg->bp_count_diff = nunits - pg->count;
1510                         pg->count = nunits;
1511                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1512                         pg->off = pg->off & PAGE_MASK;
1513                 }
1514         }
1515
1516         for (niocount = i = 1; i < page_count; i++) {
1517                 if (!can_merge_pages(pga[i - 1], pga[i]))
1518                         niocount++;
1519         }
1520
1521         pill = &req->rq_pill;
1522         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1523                              sizeof(*ioobj));
1524         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1525                              niocount * sizeof(*niobuf));
1526
1527         for (i = 0; i < page_count; i++) {
1528                 short_io_size += pga[i]->count;
1529                 if (!inode || !IS_ENCRYPTED(inode)) {
1530                         pga[i]->bp_count_diff = 0;
1531                         pga[i]->bp_off_diff = 0;
1532                 }
1533         }
1534
1535         /* Check if read/write is small enough to be a short io. */
1536         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1537             !imp_connect_shortio(cli->cl_import))
1538                 short_io_size = 0;
1539
1540         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1541                              opc == OST_READ ? 0 : short_io_size);
1542         if (opc == OST_READ)
1543                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1544                                      short_io_size);
1545
1546         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1547         if (rc) {
1548                 ptlrpc_request_free(req);
1549                 RETURN(rc);
1550         }
1551         osc_set_io_portal(req);
1552
1553         ptlrpc_at_set_req_timeout(req);
1554         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1555          * retry logic */
1556         req->rq_no_retry_einprogress = 1;
1557
1558         if (short_io_size != 0) {
1559                 desc = NULL;
1560                 short_io_buf = NULL;
1561                 goto no_bulk;
1562         }
1563
1564         desc = ptlrpc_prep_bulk_imp(req, page_count,
1565                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1566                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1567                         PTLRPC_BULK_PUT_SINK),
1568                 OST_BULK_PORTAL,
1569                 &ptlrpc_bulk_kiov_pin_ops);
1570
1571         if (desc == NULL)
1572                 GOTO(out, rc = -ENOMEM);
1573         /* NB request now owns desc and will free it when it gets freed */
1574 no_bulk:
1575         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1576         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1577         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1578         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1579
1580         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1581
1582         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1583          * and from_kgid(), because they are asynchronous. Fortunately, variable
1584          * oa contains valid o_uid and o_gid in these two operations.
1585          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1586          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1587          * other process logic */
1588         body->oa.o_uid = oa->o_uid;
1589         body->oa.o_gid = oa->o_gid;
1590
1591         obdo_to_ioobj(oa, ioobj);
1592         ioobj->ioo_bufcnt = niocount;
1593         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1594          * that might be send for this request.  The actual number is decided
1595          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1596          * "max - 1" for old client compatibility sending "0", and also so the
1597          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1598         if (desc != NULL)
1599                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1600         else /* short io */
1601                 ioobj_max_brw_set(ioobj, 0);
1602
1603         if (short_io_size != 0) {
1604                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1605                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1606                         body->oa.o_flags = 0;
1607                 }
1608                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1609                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1610                        short_io_size);
1611                 if (opc == OST_WRITE) {
1612                         short_io_buf = req_capsule_client_get(pill,
1613                                                               &RMF_SHORT_IO);
1614                         LASSERT(short_io_buf != NULL);
1615                 }
1616         }
1617
1618         LASSERT(page_count > 0);
1619         pg_prev = pga[0];
1620         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1621                 struct brw_page *pg = pga[i];
1622                 int poff = pg->off & ~PAGE_MASK;
1623
1624                 LASSERT(pg->count > 0);
1625                 /* make sure there is no gap in the middle of page array */
1626                 LASSERTF(page_count == 1 ||
1627                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1628                           ergo(i > 0 && i < page_count - 1,
1629                                poff == 0 && pg->count == PAGE_SIZE)   &&
1630                           ergo(i == page_count - 1, poff == 0)),
1631                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1632                          i, page_count, pg, pg->off, pg->count);
1633                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1634                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1635                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1636                          i, page_count,
1637                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1638                          pg_prev->pg, page_private(pg_prev->pg),
1639                          pg_prev->pg->index, pg_prev->off);
1640                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1641                         (pg->flag & OBD_BRW_SRVLOCK));
1642                 if (short_io_size != 0 && opc == OST_WRITE) {
1643                         unsigned char *ptr = kmap_atomic(pg->pg);
1644
1645                         LASSERT(short_io_size >= requested_nob + pg->count);
1646                         memcpy(short_io_buf + requested_nob,
1647                                ptr + poff,
1648                                pg->count);
1649                         kunmap_atomic(ptr);
1650                 } else if (short_io_size == 0) {
1651                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1652                                                          pg->count);
1653                 }
1654                 requested_nob += pg->count;
1655
1656                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1657                         niobuf--;
1658                         niobuf->rnb_len += pg->count;
1659                 } else {
1660                         niobuf->rnb_offset = pg->off;
1661                         niobuf->rnb_len    = pg->count;
1662                         niobuf->rnb_flags  = pg->flag;
1663                 }
1664                 pg_prev = pg;
1665         }
1666
1667         LASSERTF((void *)(niobuf - niocount) ==
1668                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1669                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1670                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1671
1672         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1673         if (resend) {
1674                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1675                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1676                         body->oa.o_flags = 0;
1677                 }
1678                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1679         }
1680
1681         if (osc_should_shrink_grant(cli))
1682                 osc_shrink_grant_local(cli, &body->oa);
1683
1684         /* size[REQ_REC_OFF] still sizeof (*body) */
1685         if (opc == OST_WRITE) {
1686                 if (cli->cl_checksum &&
1687                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1688                         /* store cl_cksum_type in a local variable since
1689                          * it can be changed via lprocfs */
1690                         enum cksum_types cksum_type = cli->cl_cksum_type;
1691
1692                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1693                                 body->oa.o_flags = 0;
1694
1695                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1696                                                                 cksum_type);
1697                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1698
1699                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1700                                                   requested_nob, page_count,
1701                                                   pga, OST_WRITE,
1702                                                   &body->oa.o_cksum);
1703                         if (rc < 0) {
1704                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1705                                        rc);
1706                                 GOTO(out, rc);
1707                         }
1708                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1709                                body->oa.o_cksum);
1710
1711                         /* save this in 'oa', too, for later checking */
1712                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1713                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1714                                                            cksum_type);
1715                 } else {
1716                         /* clear out the checksum flag, in case this is a
1717                          * resend but cl_checksum is no longer set. b=11238 */
1718                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1719                 }
1720                 oa->o_cksum = body->oa.o_cksum;
1721                 /* 1 RC per niobuf */
1722                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1723                                      sizeof(__u32) * niocount);
1724         } else {
1725                 if (cli->cl_checksum &&
1726                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1727                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1728                                 body->oa.o_flags = 0;
1729                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1730                                 cli->cl_cksum_type);
1731                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1732                 }
1733
1734                 /* Client cksum has been already copied to wire obdo in previous
1735                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1736                  * resent due to cksum error, this will allow Server to
1737                  * check+dump pages on its side */
1738         }
1739         ptlrpc_request_set_replen(req);
1740
1741         aa = ptlrpc_req_async_args(aa, req);
1742         aa->aa_oa = oa;
1743         aa->aa_requested_nob = requested_nob;
1744         aa->aa_nio_count = niocount;
1745         aa->aa_page_count = page_count;
1746         aa->aa_resends = 0;
1747         aa->aa_ppga = pga;
1748         aa->aa_cli = cli;
1749         INIT_LIST_HEAD(&aa->aa_oaps);
1750
1751         *reqp = req;
1752         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1753         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1754                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1755                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1756         RETURN(0);
1757
1758  out:
1759         ptlrpc_req_finished(req);
1760         RETURN(rc);
1761 }
1762
1763 char dbgcksum_file_name[PATH_MAX];
1764
1765 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1766                                 struct brw_page **pga, __u32 server_cksum,
1767                                 __u32 client_cksum)
1768 {
1769         struct file *filp;
1770         int rc, i;
1771         unsigned int len;
1772         char *buf;
1773
1774         /* will only keep dump of pages on first error for the same range in
1775          * file/fid, not during the resends/retries. */
1776         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1777                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1778                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1779                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1780                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1781                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1782                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1783                  pga[0]->off,
1784                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1785                  client_cksum, server_cksum);
1786         filp = filp_open(dbgcksum_file_name,
1787                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1788         if (IS_ERR(filp)) {
1789                 rc = PTR_ERR(filp);
1790                 if (rc == -EEXIST)
1791                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1792                                "checksum error: rc = %d\n", dbgcksum_file_name,
1793                                rc);
1794                 else
1795                         CERROR("%s: can't open to dump pages with checksum "
1796                                "error: rc = %d\n", dbgcksum_file_name, rc);
1797                 return;
1798         }
1799
1800         for (i = 0; i < page_count; i++) {
1801                 len = pga[i]->count;
1802                 buf = kmap(pga[i]->pg);
1803                 while (len != 0) {
1804                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1805                         if (rc < 0) {
1806                                 CERROR("%s: wanted to write %u but got %d "
1807                                        "error\n", dbgcksum_file_name, len, rc);
1808                                 break;
1809                         }
1810                         len -= rc;
1811                         buf += rc;
1812                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1813                                dbgcksum_file_name, rc);
1814                 }
1815                 kunmap(pga[i]->pg);
1816         }
1817
1818         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1819         if (rc)
1820                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1821         filp_close(filp, NULL);
1822 }
1823
1824 static int
1825 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1826                      __u32 client_cksum, __u32 server_cksum,
1827                      struct osc_brw_async_args *aa)
1828 {
1829         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1830         enum cksum_types cksum_type;
1831         obd_dif_csum_fn *fn = NULL;
1832         int sector_size = 0;
1833         __u32 new_cksum;
1834         char *msg;
1835         int rc;
1836
1837         if (server_cksum == client_cksum) {
1838                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1839                 return 0;
1840         }
1841
1842         if (aa->aa_cli->cl_checksum_dump)
1843                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1844                                     server_cksum, client_cksum);
1845
1846         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1847                                            oa->o_flags : 0);
1848
1849         switch (cksum_type) {
1850         case OBD_CKSUM_T10IP512:
1851                 fn = obd_dif_ip_fn;
1852                 sector_size = 512;
1853                 break;
1854         case OBD_CKSUM_T10IP4K:
1855                 fn = obd_dif_ip_fn;
1856                 sector_size = 4096;
1857                 break;
1858         case OBD_CKSUM_T10CRC512:
1859                 fn = obd_dif_crc_fn;
1860                 sector_size = 512;
1861                 break;
1862         case OBD_CKSUM_T10CRC4K:
1863                 fn = obd_dif_crc_fn;
1864                 sector_size = 4096;
1865                 break;
1866         default:
1867                 break;
1868         }
1869
1870         if (fn)
1871                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1872                                              aa->aa_page_count, aa->aa_ppga,
1873                                              OST_WRITE, fn, sector_size,
1874                                              &new_cksum);
1875         else
1876                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1877                                        aa->aa_ppga, OST_WRITE, cksum_type,
1878                                        &new_cksum);
1879
1880         if (rc < 0)
1881                 msg = "failed to calculate the client write checksum";
1882         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1883                 msg = "the server did not use the checksum type specified in "
1884                       "the original request - likely a protocol problem";
1885         else if (new_cksum == server_cksum)
1886                 msg = "changed on the client after we checksummed it - "
1887                       "likely false positive due to mmap IO (bug 11742)";
1888         else if (new_cksum == client_cksum)
1889                 msg = "changed in transit before arrival at OST";
1890         else
1891                 msg = "changed in transit AND doesn't match the original - "
1892                       "likely false positive due to mmap IO (bug 11742)";
1893
1894         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1895                            DFID " object "DOSTID" extent [%llu-%llu], original "
1896                            "client csum %x (type %x), server csum %x (type %x),"
1897                            " client csum now %x\n",
1898                            obd_name, msg, libcfs_nid2str(peer->nid),
1899                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1900                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1901                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1902                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1903                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1904                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1905                            client_cksum,
1906                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1907                            server_cksum, cksum_type, new_cksum);
1908         return 1;
1909 }
1910
1911 /* Note rc enters this function as number of bytes transferred */
1912 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1913 {
1914         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1915         struct client_obd *cli = aa->aa_cli;
1916         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1917         const struct lnet_process_id *peer =
1918                 &req->rq_import->imp_connection->c_peer;
1919         struct ost_body *body;
1920         u32 client_cksum = 0;
1921         struct inode *inode;
1922         unsigned int blockbits = 0, blocksize = 0;
1923
1924         ENTRY;
1925
1926         if (rc < 0 && rc != -EDQUOT) {
1927                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1928                 RETURN(rc);
1929         }
1930
1931         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1932         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1933         if (body == NULL) {
1934                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1935                 RETURN(-EPROTO);
1936         }
1937
1938         /* set/clear over quota flag for a uid/gid/projid */
1939         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1940             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1941                 unsigned qid[LL_MAXQUOTAS] = {
1942                                          body->oa.o_uid, body->oa.o_gid,
1943                                          body->oa.o_projid };
1944                 CDEBUG(D_QUOTA,
1945                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1946                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1947                        body->oa.o_valid, body->oa.o_flags);
1948                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1949                                        body->oa.o_flags);
1950         }
1951
1952         osc_update_grant(cli, body);
1953
1954         if (rc < 0)
1955                 RETURN(rc);
1956
1957         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1958                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1959
1960         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1961                 if (rc > 0) {
1962                         CERROR("%s: unexpected positive size %d\n",
1963                                obd_name, rc);
1964                         RETURN(-EPROTO);
1965                 }
1966
1967                 if (req->rq_bulk != NULL &&
1968                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1969                         RETURN(-EAGAIN);
1970
1971                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1972                     check_write_checksum(&body->oa, peer, client_cksum,
1973                                          body->oa.o_cksum, aa))
1974                         RETURN(-EAGAIN);
1975
1976                 rc = check_write_rcs(req, aa->aa_requested_nob,
1977                                      aa->aa_nio_count, aa->aa_page_count,
1978                                      aa->aa_ppga);
1979                 GOTO(out, rc);
1980         }
1981
1982         /* The rest of this function executes only for OST_READs */
1983
1984         if (req->rq_bulk == NULL) {
1985                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1986                                           RCL_SERVER);
1987                 LASSERT(rc == req->rq_status);
1988         } else {
1989                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1990                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1991         }
1992         if (rc < 0)
1993                 GOTO(out, rc = -EAGAIN);
1994
1995         if (rc > aa->aa_requested_nob) {
1996                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1997                        rc, aa->aa_requested_nob);
1998                 RETURN(-EPROTO);
1999         }
2000
2001         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2002                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2003                        rc, req->rq_bulk->bd_nob_transferred);
2004                 RETURN(-EPROTO);
2005         }
2006
2007         if (req->rq_bulk == NULL) {
2008                 /* short io */
2009                 int nob, pg_count, i = 0;
2010                 unsigned char *buf;
2011
2012                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2013                 pg_count = aa->aa_page_count;
2014                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2015                                                    rc);
2016                 nob = rc;
2017                 while (nob > 0 && pg_count > 0) {
2018                         unsigned char *ptr;
2019                         int count = aa->aa_ppga[i]->count > nob ?
2020                                     nob : aa->aa_ppga[i]->count;
2021
2022                         CDEBUG(D_CACHE, "page %p count %d\n",
2023                                aa->aa_ppga[i]->pg, count);
2024                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2025                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2026                                count);
2027                         kunmap_atomic((void *) ptr);
2028
2029                         buf += count;
2030                         nob -= count;
2031                         i++;
2032                         pg_count--;
2033                 }
2034         }
2035
2036         if (rc < aa->aa_requested_nob)
2037                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2038
2039         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2040                 static int cksum_counter;
2041                 u32        server_cksum = body->oa.o_cksum;
2042                 char      *via = "";
2043                 char      *router = "";
2044                 enum cksum_types cksum_type;
2045                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2046                         body->oa.o_flags : 0;
2047
2048                 cksum_type = obd_cksum_type_unpack(o_flags);
2049                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2050                                           aa->aa_page_count, aa->aa_ppga,
2051                                           OST_READ, &client_cksum);
2052                 if (rc < 0)
2053                         GOTO(out, rc);
2054
2055                 if (req->rq_bulk != NULL &&
2056                     peer->nid != req->rq_bulk->bd_sender) {
2057                         via = " via ";
2058                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2059                 }
2060
2061                 if (server_cksum != client_cksum) {
2062                         struct ost_body *clbody;
2063                         u32 page_count = aa->aa_page_count;
2064
2065                         clbody = req_capsule_client_get(&req->rq_pill,
2066                                                         &RMF_OST_BODY);
2067                         if (cli->cl_checksum_dump)
2068                                 dump_all_bulk_pages(&clbody->oa, page_count,
2069                                                     aa->aa_ppga, server_cksum,
2070                                                     client_cksum);
2071
2072                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2073                                            "%s%s%s inode "DFID" object "DOSTID
2074                                            " extent [%llu-%llu], client %x, "
2075                                            "server %x, cksum_type %x\n",
2076                                            obd_name,
2077                                            libcfs_nid2str(peer->nid),
2078                                            via, router,
2079                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2080                                                 clbody->oa.o_parent_seq : 0ULL,
2081                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2082                                                 clbody->oa.o_parent_oid : 0,
2083                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2084                                                 clbody->oa.o_parent_ver : 0,
2085                                            POSTID(&body->oa.o_oi),
2086                                            aa->aa_ppga[0]->off,
2087                                            aa->aa_ppga[page_count-1]->off +
2088                                            aa->aa_ppga[page_count-1]->count - 1,
2089                                            client_cksum, server_cksum,
2090                                            cksum_type);
2091                         cksum_counter = 0;
2092                         aa->aa_oa->o_cksum = client_cksum;
2093                         rc = -EAGAIN;
2094                 } else {
2095                         cksum_counter++;
2096                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2097                         rc = 0;
2098                 }
2099         } else if (unlikely(client_cksum)) {
2100                 static int cksum_missed;
2101
2102                 cksum_missed++;
2103                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2104                         CERROR("%s: checksum %u requested from %s but not sent\n",
2105                                obd_name, cksum_missed,
2106                                libcfs_nid2str(peer->nid));
2107         } else {
2108                 rc = 0;
2109         }
2110
2111         inode = page2inode(aa->aa_ppga[0]->pg);
2112         if (inode == NULL) {
2113                 /* Try to get reference to inode from cl_page if we are
2114                  * dealing with direct IO, as handled pages are not
2115                  * actual page cache pages.
2116                  */
2117                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2118
2119                 inode = oap2cl_page(oap)->cp_inode;
2120                 if (inode) {
2121                         blockbits = inode->i_blkbits;
2122                         blocksize = 1 << blockbits;
2123                 }
2124         }
2125         if (inode && IS_ENCRYPTED(inode)) {
2126                 int idx;
2127
2128                 if (!llcrypt_has_encryption_key(inode)) {
2129                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2130                         GOTO(out, rc);
2131                 }
2132                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2133                         struct brw_page *pg = aa->aa_ppga[idx];
2134                         unsigned int offs = 0;
2135
2136                         while (offs < PAGE_SIZE) {
2137                                 /* do not decrypt if page is all 0s */
2138                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2139                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2140                                         /* if page is empty forward info to
2141                                          * upper layers (ll_io_zero_page) by
2142                                          * clearing PagePrivate2
2143                                          */
2144                                         if (!offs)
2145                                                 ClearPagePrivate2(pg->pg);
2146                                         break;
2147                                 }
2148
2149                                 if (blockbits) {
2150                                         /* This is direct IO case. Directly call
2151                                          * decrypt function that takes inode as
2152                                          * input parameter. Page does not need
2153                                          * to be locked.
2154                                          */
2155                                         u64 lblk_num =
2156                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2157                                                      (PAGE_SHIFT - blockbits)) +
2158                                                        (offs >> blockbits);
2159                                         unsigned int i;
2160
2161                                         for (i = offs;
2162                                              i < offs +
2163                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2164                                              i += blocksize, lblk_num++) {
2165                                                 rc =
2166                                                   llcrypt_decrypt_block_inplace(
2167                                                           inode, pg->pg,
2168                                                           blocksize, i,
2169                                                           lblk_num);
2170                                                 if (rc)
2171                                                         break;
2172                                         }
2173                                 } else {
2174                                         rc = llcrypt_decrypt_pagecache_blocks(
2175                                                 pg->pg,
2176                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2177                                                 offs);
2178                                 }
2179                                 if (rc)
2180                                         GOTO(out, rc);
2181
2182                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2183                         }
2184                 }
2185         }
2186
2187 out:
2188         if (rc >= 0)
2189                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2190                                      aa->aa_oa, &body->oa);
2191
2192         RETURN(rc);
2193 }
2194
2195 static int osc_brw_redo_request(struct ptlrpc_request *request,
2196                                 struct osc_brw_async_args *aa, int rc)
2197 {
2198         struct ptlrpc_request *new_req;
2199         struct osc_brw_async_args *new_aa;
2200         struct osc_async_page *oap;
2201         ENTRY;
2202
2203         /* The below message is checked in replay-ost-single.sh test_8ae*/
2204         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2205                   "redo for recoverable error %d", rc);
2206
2207         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2208                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2209                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2210                                   aa->aa_ppga, &new_req, 1);
2211         if (rc)
2212                 RETURN(rc);
2213
2214         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2215                 if (oap->oap_request != NULL) {
2216                         LASSERTF(request == oap->oap_request,
2217                                  "request %p != oap_request %p\n",
2218                                  request, oap->oap_request);
2219                 }
2220         }
2221         /*
2222          * New request takes over pga and oaps from old request.
2223          * Note that copying a list_head doesn't work, need to move it...
2224          */
2225         aa->aa_resends++;
2226         new_req->rq_interpret_reply = request->rq_interpret_reply;
2227         new_req->rq_async_args = request->rq_async_args;
2228         new_req->rq_commit_cb = request->rq_commit_cb;
2229         /* cap resend delay to the current request timeout, this is similar to
2230          * what ptlrpc does (see after_reply()) */
2231         if (aa->aa_resends > new_req->rq_timeout)
2232                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2233         else
2234                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2235         new_req->rq_generation_set = 1;
2236         new_req->rq_import_generation = request->rq_import_generation;
2237
2238         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2239
2240         INIT_LIST_HEAD(&new_aa->aa_oaps);
2241         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2242         INIT_LIST_HEAD(&new_aa->aa_exts);
2243         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2244         new_aa->aa_resends = aa->aa_resends;
2245
2246         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2247                 if (oap->oap_request) {
2248                         ptlrpc_req_finished(oap->oap_request);
2249                         oap->oap_request = ptlrpc_request_addref(new_req);
2250                 }
2251         }
2252
2253         /* XXX: This code will run into problem if we're going to support
2254          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2255          * and wait for all of them to be finished. We should inherit request
2256          * set from old request. */
2257         ptlrpcd_add_req(new_req);
2258
2259         DEBUG_REQ(D_INFO, new_req, "new request");
2260         RETURN(0);
2261 }
2262
2263 /*
2264  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2265  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2266  * fine for our small page arrays and doesn't require allocation.  its an
2267  * insertion sort that swaps elements that are strides apart, shrinking the
2268  * stride down until its '1' and the array is sorted.
2269  */
2270 static void sort_brw_pages(struct brw_page **array, int num)
2271 {
2272         int stride, i, j;
2273         struct brw_page *tmp;
2274
2275         if (num == 1)
2276                 return;
2277         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2278                 ;
2279
2280         do {
2281                 stride /= 3;
2282                 for (i = stride ; i < num ; i++) {
2283                         tmp = array[i];
2284                         j = i;
2285                         while (j >= stride && array[j - stride]->off > tmp->off) {
2286                                 array[j] = array[j - stride];
2287                                 j -= stride;
2288                         }
2289                         array[j] = tmp;
2290                 }
2291         } while (stride > 1);
2292 }
2293
2294 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2295 {
2296         LASSERT(ppga != NULL);
2297         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2298 }
2299
2300 static int brw_interpret(const struct lu_env *env,
2301                          struct ptlrpc_request *req, void *args, int rc)
2302 {
2303         struct osc_brw_async_args *aa = args;
2304         struct osc_extent *ext;
2305         struct osc_extent *tmp;
2306         struct client_obd *cli = aa->aa_cli;
2307         unsigned long transferred = 0;
2308
2309         ENTRY;
2310
2311         rc = osc_brw_fini_request(req, rc);
2312         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2313
2314         /* restore clear text pages */
2315         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2316
2317         /*
2318          * When server returns -EINPROGRESS, client should always retry
2319          * regardless of the number of times the bulk was resent already.
2320          */
2321         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2322                 if (req->rq_import_generation !=
2323                     req->rq_import->imp_generation) {
2324                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2325                                ""DOSTID", rc = %d.\n",
2326                                req->rq_import->imp_obd->obd_name,
2327                                POSTID(&aa->aa_oa->o_oi), rc);
2328                 } else if (rc == -EINPROGRESS ||
2329                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2330                         rc = osc_brw_redo_request(req, aa, rc);
2331                 } else {
2332                         CERROR("%s: too many resent retries for object: "
2333                                "%llu:%llu, rc = %d.\n",
2334                                req->rq_import->imp_obd->obd_name,
2335                                POSTID(&aa->aa_oa->o_oi), rc);
2336                 }
2337
2338                 if (rc == 0)
2339                         RETURN(0);
2340                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2341                         rc = -EIO;
2342         }
2343
2344         if (rc == 0) {
2345                 struct obdo *oa = aa->aa_oa;
2346                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2347                 unsigned long valid = 0;
2348                 struct cl_object *obj;
2349                 struct osc_async_page *last;
2350
2351                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2352                 obj = osc2cl(last->oap_obj);
2353
2354                 cl_object_attr_lock(obj);
2355                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2356                         attr->cat_blocks = oa->o_blocks;
2357                         valid |= CAT_BLOCKS;
2358                 }
2359                 if (oa->o_valid & OBD_MD_FLMTIME) {
2360                         attr->cat_mtime = oa->o_mtime;
2361                         valid |= CAT_MTIME;
2362                 }
2363                 if (oa->o_valid & OBD_MD_FLATIME) {
2364                         attr->cat_atime = oa->o_atime;
2365                         valid |= CAT_ATIME;
2366                 }
2367                 if (oa->o_valid & OBD_MD_FLCTIME) {
2368                         attr->cat_ctime = oa->o_ctime;
2369                         valid |= CAT_CTIME;
2370                 }
2371
2372                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2373                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2374                         loff_t last_off = last->oap_count + last->oap_obj_off +
2375                                 last->oap_page_off;
2376
2377                         /* Change file size if this is an out of quota or
2378                          * direct IO write and it extends the file size */
2379                         if (loi->loi_lvb.lvb_size < last_off) {
2380                                 attr->cat_size = last_off;
2381                                 valid |= CAT_SIZE;
2382                         }
2383                         /* Extend KMS if it's not a lockless write */
2384                         if (loi->loi_kms < last_off &&
2385                             oap2osc_page(last)->ops_srvlock == 0) {
2386                                 attr->cat_kms = last_off;
2387                                 valid |= CAT_KMS;
2388                         }
2389                 }
2390
2391                 if (valid != 0)
2392                         cl_object_attr_update(env, obj, attr, valid);
2393                 cl_object_attr_unlock(obj);
2394         }
2395         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2396         aa->aa_oa = NULL;
2397
2398         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2399                 osc_inc_unstable_pages(req);
2400
2401         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2402                 list_del_init(&ext->oe_link);
2403                 osc_extent_finish(env, ext, 1,
2404                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2405         }
2406         LASSERT(list_empty(&aa->aa_exts));
2407         LASSERT(list_empty(&aa->aa_oaps));
2408
2409         transferred = (req->rq_bulk == NULL ? /* short io */
2410                        aa->aa_requested_nob :
2411                        req->rq_bulk->bd_nob_transferred);
2412
2413         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2414         ptlrpc_lprocfs_brw(req, transferred);
2415
2416         spin_lock(&cli->cl_loi_list_lock);
2417         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2418          * is called so we know whether to go to sync BRWs or wait for more
2419          * RPCs to complete */
2420         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2421                 cli->cl_w_in_flight--;
2422         else
2423                 cli->cl_r_in_flight--;
2424         osc_wake_cache_waiters(cli);
2425         spin_unlock(&cli->cl_loi_list_lock);
2426
2427         osc_io_unplug(env, cli, NULL);
2428         RETURN(rc);
2429 }
2430
2431 static void brw_commit(struct ptlrpc_request *req)
2432 {
2433         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2434          * this called via the rq_commit_cb, I need to ensure
2435          * osc_dec_unstable_pages is still called. Otherwise unstable
2436          * pages may be leaked. */
2437         spin_lock(&req->rq_lock);
2438         if (likely(req->rq_unstable)) {
2439                 req->rq_unstable = 0;
2440                 spin_unlock(&req->rq_lock);
2441
2442                 osc_dec_unstable_pages(req);
2443         } else {
2444                 req->rq_committed = 1;
2445                 spin_unlock(&req->rq_lock);
2446         }
2447 }
2448
2449 /**
2450  * Build an RPC by the list of extent @ext_list. The caller must ensure
2451  * that the total pages in this list are NOT over max pages per RPC.
2452  * Extents in the list must be in OES_RPC state.
2453  */
2454 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2455                   struct list_head *ext_list, int cmd)
2456 {
2457         struct ptlrpc_request           *req = NULL;
2458         struct osc_extent               *ext;
2459         struct brw_page                 **pga = NULL;
2460         struct osc_brw_async_args       *aa = NULL;
2461         struct obdo                     *oa = NULL;
2462         struct osc_async_page           *oap;
2463         struct osc_object               *obj = NULL;
2464         struct cl_req_attr              *crattr = NULL;
2465         loff_t                          starting_offset = OBD_OBJECT_EOF;
2466         loff_t                          ending_offset = 0;
2467         /* '1' for consistency with code that checks !mpflag to restore */
2468         int mpflag = 1;
2469         int                             mem_tight = 0;
2470         int                             page_count = 0;
2471         bool                            soft_sync = false;
2472         bool                            ndelay = false;
2473         int                             i;
2474         int                             grant = 0;
2475         int                             rc;
2476         __u32                           layout_version = 0;
2477         LIST_HEAD(rpc_list);
2478         struct ost_body                 *body;
2479         ENTRY;
2480         LASSERT(!list_empty(ext_list));
2481
2482         /* add pages into rpc_list to build BRW rpc */
2483         list_for_each_entry(ext, ext_list, oe_link) {
2484                 LASSERT(ext->oe_state == OES_RPC);
2485                 mem_tight |= ext->oe_memalloc;
2486                 grant += ext->oe_grants;
2487                 page_count += ext->oe_nr_pages;
2488                 layout_version = max(layout_version, ext->oe_layout_version);
2489                 if (obj == NULL)
2490                         obj = ext->oe_obj;
2491         }
2492
2493         soft_sync = osc_over_unstable_soft_limit(cli);
2494         if (mem_tight)
2495                 mpflag = memalloc_noreclaim_save();
2496
2497         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2498         if (pga == NULL)
2499                 GOTO(out, rc = -ENOMEM);
2500
2501         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2502         if (oa == NULL)
2503                 GOTO(out, rc = -ENOMEM);
2504
2505         i = 0;
2506         list_for_each_entry(ext, ext_list, oe_link) {
2507                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2508                         if (mem_tight)
2509                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2510                         if (soft_sync)
2511                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2512                         pga[i] = &oap->oap_brw_page;
2513                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2514                         i++;
2515
2516                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2517                         if (starting_offset == OBD_OBJECT_EOF ||
2518                             starting_offset > oap->oap_obj_off)
2519                                 starting_offset = oap->oap_obj_off;
2520                         else
2521                                 LASSERT(oap->oap_page_off == 0);
2522                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2523                                 ending_offset = oap->oap_obj_off +
2524                                                 oap->oap_count;
2525                         else
2526                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2527                                         PAGE_SIZE);
2528                 }
2529                 if (ext->oe_ndelay)
2530                         ndelay = true;
2531         }
2532
2533         /* first page in the list */
2534         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2535
2536         crattr = &osc_env_info(env)->oti_req_attr;
2537         memset(crattr, 0, sizeof(*crattr));
2538         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2539         crattr->cra_flags = ~0ULL;
2540         crattr->cra_page = oap2cl_page(oap);
2541         crattr->cra_oa = oa;
2542         cl_req_attr_set(env, osc2cl(obj), crattr);
2543
2544         if (cmd == OBD_BRW_WRITE) {
2545                 oa->o_grant_used = grant;
2546                 if (layout_version > 0) {
2547                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2548                                PFID(&oa->o_oi.oi_fid), layout_version);
2549
2550                         oa->o_layout_version = layout_version;
2551                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2552                 }
2553         }
2554
2555         sort_brw_pages(pga, page_count);
2556         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2557         if (rc != 0) {
2558                 CERROR("prep_req failed: %d\n", rc);
2559                 GOTO(out, rc);
2560         }
2561
2562         req->rq_commit_cb = brw_commit;
2563         req->rq_interpret_reply = brw_interpret;
2564         req->rq_memalloc = mem_tight != 0;
2565         oap->oap_request = ptlrpc_request_addref(req);
2566         if (ndelay) {
2567                 req->rq_no_resend = req->rq_no_delay = 1;
2568                 /* probably set a shorter timeout value.
2569                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2570                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2571         }
2572
2573         /* Need to update the timestamps after the request is built in case
2574          * we race with setattr (locally or in queue at OST).  If OST gets
2575          * later setattr before earlier BRW (as determined by the request xid),
2576          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2577          * way to do this in a single call.  bug 10150 */
2578         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2579         crattr->cra_oa = &body->oa;
2580         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2581         cl_req_attr_set(env, osc2cl(obj), crattr);
2582         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2583
2584         aa = ptlrpc_req_async_args(aa, req);
2585         INIT_LIST_HEAD(&aa->aa_oaps);
2586         list_splice_init(&rpc_list, &aa->aa_oaps);
2587         INIT_LIST_HEAD(&aa->aa_exts);
2588         list_splice_init(ext_list, &aa->aa_exts);
2589
2590         spin_lock(&cli->cl_loi_list_lock);
2591         starting_offset >>= PAGE_SHIFT;
2592         if (cmd == OBD_BRW_READ) {
2593                 cli->cl_r_in_flight++;
2594                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2595                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2596                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2597                                       starting_offset + 1);
2598         } else {
2599                 cli->cl_w_in_flight++;
2600                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2601                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2602                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2603                                       starting_offset + 1);
2604         }
2605         spin_unlock(&cli->cl_loi_list_lock);
2606
2607         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2608                   page_count, aa, cli->cl_r_in_flight,
2609                   cli->cl_w_in_flight);
2610         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2611
2612         ptlrpcd_add_req(req);
2613         rc = 0;
2614         EXIT;
2615
2616 out:
2617         if (mem_tight)
2618                 memalloc_noreclaim_restore(mpflag);
2619
2620         if (rc != 0) {
2621                 LASSERT(req == NULL);
2622
2623                 if (oa)
2624                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2625                 if (pga) {
2626                         osc_release_bounce_pages(pga, page_count);
2627                         osc_release_ppga(pga, page_count);
2628                 }
2629                 /* this should happen rarely and is pretty bad, it makes the
2630                  * pending list not follow the dirty order */
2631                 while (!list_empty(ext_list)) {
2632                         ext = list_entry(ext_list->next, struct osc_extent,
2633                                          oe_link);
2634                         list_del_init(&ext->oe_link);
2635                         osc_extent_finish(env, ext, 0, rc);
2636                 }
2637         }
2638         RETURN(rc);
2639 }
2640
2641 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2642 {
2643         int set = 0;
2644
2645         LASSERT(lock != NULL);
2646
2647         lock_res_and_lock(lock);
2648
2649         if (lock->l_ast_data == NULL)
2650                 lock->l_ast_data = data;
2651         if (lock->l_ast_data == data)
2652                 set = 1;
2653
2654         unlock_res_and_lock(lock);
2655
2656         return set;
2657 }
2658
2659 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2660                      void *cookie, struct lustre_handle *lockh,
2661                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2662                      int errcode)
2663 {
2664         bool intent = *flags & LDLM_FL_HAS_INTENT;
2665         int rc;
2666         ENTRY;
2667
2668         /* The request was created before ldlm_cli_enqueue call. */
2669         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2670                 struct ldlm_reply *rep;
2671
2672                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2673                 LASSERT(rep != NULL);
2674
2675                 rep->lock_policy_res1 =
2676                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2677                 if (rep->lock_policy_res1)
2678                         errcode = rep->lock_policy_res1;
2679                 if (!speculative)
2680                         *flags |= LDLM_FL_LVB_READY;
2681         } else if (errcode == ELDLM_OK) {
2682                 *flags |= LDLM_FL_LVB_READY;
2683         }
2684
2685         /* Call the update callback. */
2686         rc = (*upcall)(cookie, lockh, errcode);
2687
2688         /* release the reference taken in ldlm_cli_enqueue() */
2689         if (errcode == ELDLM_LOCK_MATCHED)
2690                 errcode = ELDLM_OK;
2691         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2692                 ldlm_lock_decref(lockh, mode);
2693
2694         RETURN(rc);
2695 }
2696
2697 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2698                           void *args, int rc)
2699 {
2700         struct osc_enqueue_args *aa = args;
2701         struct ldlm_lock *lock;
2702         struct lustre_handle *lockh = &aa->oa_lockh;
2703         enum ldlm_mode mode = aa->oa_mode;
2704         struct ost_lvb *lvb = aa->oa_lvb;
2705         __u32 lvb_len = sizeof(*lvb);
2706         __u64 flags = 0;
2707         struct ldlm_enqueue_info einfo = {
2708                 .ei_type = aa->oa_type,
2709                 .ei_mode = mode,
2710         };
2711
2712         ENTRY;
2713
2714         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2715          * be valid. */
2716         lock = ldlm_handle2lock(lockh);
2717         LASSERTF(lock != NULL,
2718                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2719                  lockh->cookie, req, aa);
2720
2721         /* Take an additional reference so that a blocking AST that
2722          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2723          * to arrive after an upcall has been executed by
2724          * osc_enqueue_fini(). */
2725         ldlm_lock_addref(lockh, mode);
2726
2727         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2728         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2729
2730         /* Let CP AST to grant the lock first. */
2731         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2732
2733         if (aa->oa_speculative) {
2734                 LASSERT(aa->oa_lvb == NULL);
2735                 LASSERT(aa->oa_flags == NULL);
2736                 aa->oa_flags = &flags;
2737         }
2738
2739         /* Complete obtaining the lock procedure. */
2740         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2741                                    lvb, lvb_len, lockh, rc);
2742         /* Complete osc stuff. */
2743         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2744                               aa->oa_flags, aa->oa_speculative, rc);
2745
2746         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2747
2748         ldlm_lock_decref(lockh, mode);
2749         LDLM_LOCK_PUT(lock);
2750         RETURN(rc);
2751 }
2752
2753 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2754  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2755  * other synchronous requests, however keeping some locks and trying to obtain
2756  * others may take a considerable amount of time in a case of ost failure; and
2757  * when other sync requests do not get released lock from a client, the client
2758  * is evicted from the cluster -- such scenarious make the life difficult, so
2759  * release locks just after they are obtained. */
2760 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2761                      __u64 *flags, union ldlm_policy_data *policy,
2762                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2763                      void *cookie, struct ldlm_enqueue_info *einfo,
2764                      struct ptlrpc_request_set *rqset, int async,
2765                      bool speculative)
2766 {
2767         struct obd_device *obd = exp->exp_obd;
2768         struct lustre_handle lockh = { 0 };
2769         struct ptlrpc_request *req = NULL;
2770         int intent = *flags & LDLM_FL_HAS_INTENT;
2771         __u64 match_flags = *flags;
2772         enum ldlm_mode mode;
2773         int rc;
2774         ENTRY;
2775
2776         /* Filesystem lock extents are extended to page boundaries so that
2777          * dealing with the page cache is a little smoother.  */
2778         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2779         policy->l_extent.end |= ~PAGE_MASK;
2780
2781         /* Next, search for already existing extent locks that will cover us */
2782         /* If we're trying to read, we also search for an existing PW lock.  The
2783          * VFS and page cache already protect us locally, so lots of readers/
2784          * writers can share a single PW lock.
2785          *
2786          * There are problems with conversion deadlocks, so instead of
2787          * converting a read lock to a write lock, we'll just enqueue a new
2788          * one.
2789          *
2790          * At some point we should cancel the read lock instead of making them
2791          * send us a blocking callback, but there are problems with canceling
2792          * locks out from other users right now, too. */
2793         mode = einfo->ei_mode;
2794         if (einfo->ei_mode == LCK_PR)
2795                 mode |= LCK_PW;
2796         /* Normal lock requests must wait for the LVB to be ready before
2797          * matching a lock; speculative lock requests do not need to,
2798          * because they will not actually use the lock. */
2799         if (!speculative)
2800                 match_flags |= LDLM_FL_LVB_READY;
2801         if (intent != 0)
2802                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2803         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2804                                einfo->ei_type, policy, mode, &lockh);
2805         if (mode) {
2806                 struct ldlm_lock *matched;
2807
2808                 if (*flags & LDLM_FL_TEST_LOCK)
2809                         RETURN(ELDLM_OK);
2810
2811                 matched = ldlm_handle2lock(&lockh);
2812                 if (speculative) {
2813                         /* This DLM lock request is speculative, and does not
2814                          * have an associated IO request. Therefore if there
2815                          * is already a DLM lock, it wll just inform the
2816                          * caller to cancel the request for this stripe.*/
2817                         lock_res_and_lock(matched);
2818                         if (ldlm_extent_equal(&policy->l_extent,
2819                             &matched->l_policy_data.l_extent))
2820                                 rc = -EEXIST;
2821                         else
2822                                 rc = -ECANCELED;
2823                         unlock_res_and_lock(matched);
2824
2825                         ldlm_lock_decref(&lockh, mode);
2826                         LDLM_LOCK_PUT(matched);
2827                         RETURN(rc);
2828                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2829                         *flags |= LDLM_FL_LVB_READY;
2830
2831                         /* We already have a lock, and it's referenced. */
2832                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2833
2834                         ldlm_lock_decref(&lockh, mode);
2835                         LDLM_LOCK_PUT(matched);
2836                         RETURN(ELDLM_OK);
2837                 } else {
2838                         ldlm_lock_decref(&lockh, mode);
2839                         LDLM_LOCK_PUT(matched);
2840                 }
2841         }
2842
2843         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2844                 RETURN(-ENOLCK);
2845
2846         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2847         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2848
2849         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2850                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2851         if (async) {
2852                 if (!rc) {
2853                         struct osc_enqueue_args *aa;
2854                         aa = ptlrpc_req_async_args(aa, req);
2855                         aa->oa_exp         = exp;
2856                         aa->oa_mode        = einfo->ei_mode;
2857                         aa->oa_type        = einfo->ei_type;
2858                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2859                         aa->oa_upcall      = upcall;
2860                         aa->oa_cookie      = cookie;
2861                         aa->oa_speculative = speculative;
2862                         if (!speculative) {
2863                                 aa->oa_flags  = flags;
2864                                 aa->oa_lvb    = lvb;
2865                         } else {
2866                                 /* speculative locks are essentially to enqueue
2867                                  * a DLM lock  in advance, so we don't care
2868                                  * about the result of the enqueue. */
2869                                 aa->oa_lvb    = NULL;
2870                                 aa->oa_flags  = NULL;
2871                         }
2872
2873                         req->rq_interpret_reply = osc_enqueue_interpret;
2874                         ptlrpc_set_add_req(rqset, req);
2875                 }
2876                 RETURN(rc);
2877         }
2878
2879         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2880                               flags, speculative, rc);
2881
2882         RETURN(rc);
2883 }
2884
2885 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2886                    struct ldlm_res_id *res_id, enum ldlm_type type,
2887                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2888                    __u64 *flags, struct osc_object *obj,
2889                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2890 {
2891         struct obd_device *obd = exp->exp_obd;
2892         __u64 lflags = *flags;
2893         enum ldlm_mode rc;
2894         ENTRY;
2895
2896         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2897                 RETURN(-EIO);
2898
2899         /* Filesystem lock extents are extended to page boundaries so that
2900          * dealing with the page cache is a little smoother */
2901         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2902         policy->l_extent.end |= ~PAGE_MASK;
2903
2904         /* Next, search for already existing extent locks that will cover us */
2905         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2906                                         res_id, type, policy, mode, lockh,
2907                                         match_flags);
2908         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2909                 RETURN(rc);
2910
2911         if (obj != NULL) {
2912                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2913
2914                 LASSERT(lock != NULL);
2915                 if (osc_set_lock_data(lock, obj)) {
2916                         lock_res_and_lock(lock);
2917                         if (!ldlm_is_lvb_cached(lock)) {
2918                                 LASSERT(lock->l_ast_data == obj);
2919                                 osc_lock_lvb_update(env, obj, lock, NULL);
2920                                 ldlm_set_lvb_cached(lock);
2921                         }
2922                         unlock_res_and_lock(lock);
2923                 } else {
2924                         ldlm_lock_decref(lockh, rc);
2925                         rc = 0;
2926                 }
2927                 LDLM_LOCK_PUT(lock);
2928         }
2929         RETURN(rc);
2930 }
2931
2932 static int osc_statfs_interpret(const struct lu_env *env,
2933                                 struct ptlrpc_request *req, void *args, int rc)
2934 {
2935         struct osc_async_args *aa = args;
2936         struct obd_statfs *msfs;
2937
2938         ENTRY;
2939         if (rc == -EBADR)
2940                 /*
2941                  * The request has in fact never been sent due to issues at
2942                  * a higher level (LOV).  Exit immediately since the caller
2943                  * is aware of the problem and takes care of the clean up.
2944                  */
2945                 RETURN(rc);
2946
2947         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2948             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2949                 GOTO(out, rc = 0);
2950
2951         if (rc != 0)
2952                 GOTO(out, rc);
2953
2954         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2955         if (msfs == NULL)
2956                 GOTO(out, rc = -EPROTO);
2957
2958         *aa->aa_oi->oi_osfs = *msfs;
2959 out:
2960         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2961
2962         RETURN(rc);
2963 }
2964
2965 static int osc_statfs_async(struct obd_export *exp,
2966                             struct obd_info *oinfo, time64_t max_age,
2967                             struct ptlrpc_request_set *rqset)
2968 {
2969         struct obd_device     *obd = class_exp2obd(exp);
2970         struct ptlrpc_request *req;
2971         struct osc_async_args *aa;
2972         int rc;
2973         ENTRY;
2974
2975         if (obd->obd_osfs_age >= max_age) {
2976                 CDEBUG(D_SUPER,
2977                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2978                        obd->obd_name, &obd->obd_osfs,
2979                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2980                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2981                 spin_lock(&obd->obd_osfs_lock);
2982                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2983                 spin_unlock(&obd->obd_osfs_lock);
2984                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2985                 if (oinfo->oi_cb_up)
2986                         oinfo->oi_cb_up(oinfo, 0);
2987
2988                 RETURN(0);
2989         }
2990
2991         /* We could possibly pass max_age in the request (as an absolute
2992          * timestamp or a "seconds.usec ago") so the target can avoid doing
2993          * extra calls into the filesystem if that isn't necessary (e.g.
2994          * during mount that would help a bit).  Having relative timestamps
2995          * is not so great if request processing is slow, while absolute
2996          * timestamps are not ideal because they need time synchronization. */
2997         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2998         if (req == NULL)
2999                 RETURN(-ENOMEM);
3000
3001         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3002         if (rc) {
3003                 ptlrpc_request_free(req);
3004                 RETURN(rc);
3005         }
3006         ptlrpc_request_set_replen(req);
3007         req->rq_request_portal = OST_CREATE_PORTAL;
3008         ptlrpc_at_set_req_timeout(req);
3009
3010         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3011                 /* procfs requests not want stat in wait for avoid deadlock */
3012                 req->rq_no_resend = 1;
3013                 req->rq_no_delay = 1;
3014         }
3015
3016         req->rq_interpret_reply = osc_statfs_interpret;
3017         aa = ptlrpc_req_async_args(aa, req);
3018         aa->aa_oi = oinfo;
3019
3020         ptlrpc_set_add_req(rqset, req);
3021         RETURN(0);
3022 }
3023
3024 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3025                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3026 {
3027         struct obd_device     *obd = class_exp2obd(exp);
3028         struct obd_statfs     *msfs;
3029         struct ptlrpc_request *req;
3030         struct obd_import     *imp, *imp0;
3031         int rc;
3032         ENTRY;
3033
3034         /*Since the request might also come from lprocfs, so we need
3035          *sync this with client_disconnect_export Bug15684
3036          */
3037         with_imp_locked(obd, imp0, rc)
3038                 imp = class_import_get(imp0);
3039         if (rc)
3040                 RETURN(rc);
3041
3042         /* We could possibly pass max_age in the request (as an absolute
3043          * timestamp or a "seconds.usec ago") so the target can avoid doing
3044          * extra calls into the filesystem if that isn't necessary (e.g.
3045          * during mount that would help a bit).  Having relative timestamps
3046          * is not so great if request processing is slow, while absolute
3047          * timestamps are not ideal because they need time synchronization. */
3048         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3049
3050         class_import_put(imp);
3051
3052         if (req == NULL)
3053                 RETURN(-ENOMEM);
3054
3055         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3056         if (rc) {
3057                 ptlrpc_request_free(req);
3058                 RETURN(rc);
3059         }
3060         ptlrpc_request_set_replen(req);
3061         req->rq_request_portal = OST_CREATE_PORTAL;
3062         ptlrpc_at_set_req_timeout(req);
3063
3064         if (flags & OBD_STATFS_NODELAY) {
3065                 /* procfs requests not want stat in wait for avoid deadlock */
3066                 req->rq_no_resend = 1;
3067                 req->rq_no_delay = 1;
3068         }
3069
3070         rc = ptlrpc_queue_wait(req);
3071         if (rc)
3072                 GOTO(out, rc);
3073
3074         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3075         if (msfs == NULL)
3076                 GOTO(out, rc = -EPROTO);
3077
3078         *osfs = *msfs;
3079
3080         EXIT;
3081 out:
3082         ptlrpc_req_finished(req);
3083         return rc;
3084 }
3085
3086 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3087                          void *karg, void __user *uarg)
3088 {
3089         struct obd_device *obd = exp->exp_obd;
3090         struct obd_ioctl_data *data = karg;
3091         int rc = 0;
3092
3093         ENTRY;
3094         if (!try_module_get(THIS_MODULE)) {
3095                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3096                        module_name(THIS_MODULE));
3097                 return -EINVAL;
3098         }
3099         switch (cmd) {
3100         case OBD_IOC_CLIENT_RECOVER:
3101                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3102                                            data->ioc_inlbuf1, 0);
3103                 if (rc > 0)
3104                         rc = 0;
3105                 break;
3106         case IOC_OSC_SET_ACTIVE:
3107                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3108                                               data->ioc_offset);
3109                 break;
3110         default:
3111                 rc = -ENOTTY;
3112                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3113                        obd->obd_name, cmd, current->comm, rc);
3114                 break;
3115         }
3116
3117         module_put(THIS_MODULE);
3118         return rc;
3119 }
3120
3121 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3122                        u32 keylen, void *key, u32 vallen, void *val,
3123                        struct ptlrpc_request_set *set)
3124 {
3125         struct ptlrpc_request *req;
3126         struct obd_device     *obd = exp->exp_obd;
3127         struct obd_import     *imp = class_exp2cliimp(exp);
3128         char                  *tmp;
3129         int                    rc;
3130         ENTRY;
3131
3132         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3133
3134         if (KEY_IS(KEY_CHECKSUM)) {
3135                 if (vallen != sizeof(int))
3136                         RETURN(-EINVAL);
3137                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3138                 RETURN(0);
3139         }
3140
3141         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3142                 sptlrpc_conf_client_adapt(obd);
3143                 RETURN(0);
3144         }
3145
3146         if (KEY_IS(KEY_FLUSH_CTX)) {
3147                 sptlrpc_import_flush_my_ctx(imp);
3148                 RETURN(0);
3149         }
3150
3151         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3152                 struct client_obd *cli = &obd->u.cli;
3153                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3154                 long target = *(long *)val;
3155
3156                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3157                 *(long *)val -= nr;
3158                 RETURN(0);
3159         }
3160
3161         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3162                 RETURN(-EINVAL);
3163
3164         /* We pass all other commands directly to OST. Since nobody calls osc
3165            methods directly and everybody is supposed to go through LOV, we
3166            assume lov checked invalid values for us.
3167            The only recognised values so far are evict_by_nid and mds_conn.
3168            Even if something bad goes through, we'd get a -EINVAL from OST
3169            anyway. */
3170
3171         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3172                                                 &RQF_OST_SET_GRANT_INFO :
3173                                                 &RQF_OBD_SET_INFO);
3174         if (req == NULL)
3175                 RETURN(-ENOMEM);
3176
3177         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3178                              RCL_CLIENT, keylen);
3179         if (!KEY_IS(KEY_GRANT_SHRINK))
3180                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3181                                      RCL_CLIENT, vallen);
3182         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3183         if (rc) {
3184                 ptlrpc_request_free(req);
3185                 RETURN(rc);
3186         }
3187
3188         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3189         memcpy(tmp, key, keylen);
3190         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3191                                                         &RMF_OST_BODY :
3192                                                         &RMF_SETINFO_VAL);
3193         memcpy(tmp, val, vallen);
3194
3195         if (KEY_IS(KEY_GRANT_SHRINK)) {
3196                 struct osc_grant_args *aa;
3197                 struct obdo *oa;
3198
3199                 aa = ptlrpc_req_async_args(aa, req);
3200                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3201                 if (!oa) {
3202                         ptlrpc_req_finished(req);
3203                         RETURN(-ENOMEM);
3204                 }
3205                 *oa = ((struct ost_body *)val)->oa;
3206                 aa->aa_oa = oa;
3207                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3208         }
3209
3210         ptlrpc_request_set_replen(req);
3211         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3212                 LASSERT(set != NULL);
3213                 ptlrpc_set_add_req(set, req);
3214                 ptlrpc_check_set(NULL, set);
3215         } else {
3216                 ptlrpcd_add_req(req);
3217         }
3218
3219         RETURN(0);
3220 }
3221 EXPORT_SYMBOL(osc_set_info_async);
3222
3223 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3224                   struct obd_device *obd, struct obd_uuid *cluuid,
3225                   struct obd_connect_data *data, void *localdata)
3226 {
3227         struct client_obd *cli = &obd->u.cli;
3228
3229         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3230                 long lost_grant;
3231                 long grant;
3232
3233                 spin_lock(&cli->cl_loi_list_lock);
3234                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3235                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3236                         /* restore ocd_grant_blkbits as client page bits */
3237                         data->ocd_grant_blkbits = PAGE_SHIFT;
3238                         grant += cli->cl_dirty_grant;
3239                 } else {
3240                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3241                 }
3242                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3243                 lost_grant = cli->cl_lost_grant;
3244                 cli->cl_lost_grant = 0;
3245                 spin_unlock(&cli->cl_loi_list_lock);
3246
3247                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3248                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3249                        data->ocd_version, data->ocd_grant, lost_grant);
3250         }
3251
3252         RETURN(0);
3253 }
3254 EXPORT_SYMBOL(osc_reconnect);
3255
3256 int osc_disconnect(struct obd_export *exp)
3257 {
3258         struct obd_device *obd = class_exp2obd(exp);
3259         int rc;
3260
3261         rc = client_disconnect_export(exp);
3262         /**
3263          * Initially we put del_shrink_grant before disconnect_export, but it
3264          * causes the following problem if setup (connect) and cleanup
3265          * (disconnect) are tangled together.
3266          *      connect p1                     disconnect p2
3267          *   ptlrpc_connect_import
3268          *     ...............               class_manual_cleanup
3269          *                                     osc_disconnect
3270          *                                     del_shrink_grant
3271          *   ptlrpc_connect_interrupt
3272          *     osc_init_grant
3273          *   add this client to shrink list
3274          *                                      cleanup_osc
3275          * Bang! grant shrink thread trigger the shrink. BUG18662
3276          */
3277         osc_del_grant_list(&obd->u.cli);
3278         return rc;
3279 }
3280 EXPORT_SYMBOL(osc_disconnect);
3281
3282 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3283                                  struct hlist_node *hnode, void *arg)
3284 {
3285         struct lu_env *env = arg;
3286         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3287         struct ldlm_lock *lock;
3288         struct osc_object *osc = NULL;
3289         ENTRY;
3290
3291         lock_res(res);
3292         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3293                 if (lock->l_ast_data != NULL && osc == NULL) {
3294                         osc = lock->l_ast_data;
3295                         cl_object_get(osc2cl(osc));
3296                 }
3297
3298                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3299                  * by the 2nd round of ldlm_namespace_clean() call in
3300                  * osc_import_event(). */
3301                 ldlm_clear_cleaned(lock);
3302         }
3303         unlock_res(res);
3304
3305         if (osc != NULL) {
3306                 osc_object_invalidate(env, osc);
3307                 cl_object_put(env, osc2cl(osc));
3308         }
3309
3310         RETURN(0);
3311 }
3312 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3313
3314 static int osc_import_event(struct obd_device *obd,
3315                             struct obd_import *imp,
3316                             enum obd_import_event event)
3317 {
3318         struct client_obd *cli;
3319         int rc = 0;
3320
3321         ENTRY;
3322         LASSERT(imp->imp_obd == obd);
3323
3324         switch (event) {
3325         case IMP_EVENT_DISCON: {
3326                 cli = &obd->u.cli;
3327                 spin_lock(&cli->cl_loi_list_lock);
3328                 cli->cl_avail_grant = 0;
3329                 cli->cl_lost_grant = 0;
3330                 spin_unlock(&cli->cl_loi_list_lock);
3331                 break;
3332         }
3333         case IMP_EVENT_INACTIVE: {
3334                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3335                 break;
3336         }
3337         case IMP_EVENT_INVALIDATE: {
3338                 struct ldlm_namespace *ns = obd->obd_namespace;
3339                 struct lu_env         *env;
3340                 __u16                  refcheck;
3341
3342                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3343
3344                 env = cl_env_get(&refcheck);
3345                 if (!IS_ERR(env)) {
3346                         osc_io_unplug(env, &obd->u.cli, NULL);
3347
3348                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3349                                                  osc_ldlm_resource_invalidate,
3350                                                  env, 0);
3351                         cl_env_put(env, &refcheck);
3352
3353                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3354                 } else
3355                         rc = PTR_ERR(env);
3356                 break;
3357         }
3358         case IMP_EVENT_ACTIVE: {
3359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3360                 break;
3361         }
3362         case IMP_EVENT_OCD: {
3363                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3364
3365                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3366                         osc_init_grant(&obd->u.cli, ocd);
3367
3368                 /* See bug 7198 */
3369                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3370                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3371
3372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3373                 break;
3374         }
3375         case IMP_EVENT_DEACTIVATE: {
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3377                 break;
3378         }
3379         case IMP_EVENT_ACTIVATE: {
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3381                 break;
3382         }
3383         default:
3384                 CERROR("Unknown import event %d\n", event);
3385                 LBUG();
3386         }
3387         RETURN(rc);
3388 }
3389
3390 /**
3391  * Determine whether the lock can be canceled before replaying the lock
3392  * during recovery, see bug16774 for detailed information.
3393  *
3394  * \retval zero the lock can't be canceled
3395  * \retval other ok to cancel
3396  */
3397 static int osc_cancel_weight(struct ldlm_lock *lock)
3398 {
3399         /*
3400          * Cancel all unused and granted extent lock.
3401          */
3402         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3403             ldlm_is_granted(lock) &&
3404             osc_ldlm_weigh_ast(lock) == 0)
3405                 RETURN(1);
3406
3407         RETURN(0);
3408 }
3409
3410 static int brw_queue_work(const struct lu_env *env, void *data)
3411 {
3412         struct client_obd *cli = data;
3413
3414         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3415
3416         osc_io_unplug(env, cli, NULL);
3417         RETURN(0);
3418 }
3419
3420 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3421 {
3422         struct client_obd *cli = &obd->u.cli;
3423         void *handler;
3424         int rc;
3425
3426         ENTRY;
3427
3428         rc = ptlrpcd_addref();
3429         if (rc)
3430                 RETURN(rc);
3431
3432         rc = client_obd_setup(obd, lcfg);
3433         if (rc)
3434                 GOTO(out_ptlrpcd, rc);
3435
3436
3437         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3438         if (IS_ERR(handler))
3439                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3440         cli->cl_writeback_work = handler;
3441
3442         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3443         if (IS_ERR(handler))
3444                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3445         cli->cl_lru_work = handler;
3446
3447         rc = osc_quota_setup(obd);
3448         if (rc)
3449                 GOTO(out_ptlrpcd_work, rc);
3450
3451         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3452         osc_update_next_shrink(cli);
3453
3454         RETURN(rc);
3455
3456 out_ptlrpcd_work:
3457         if (cli->cl_writeback_work != NULL) {
3458                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3459                 cli->cl_writeback_work = NULL;
3460         }
3461         if (cli->cl_lru_work != NULL) {
3462                 ptlrpcd_destroy_work(cli->cl_lru_work);
3463                 cli->cl_lru_work = NULL;
3464         }
3465         client_obd_cleanup(obd);
3466 out_ptlrpcd:
3467         ptlrpcd_decref();
3468         RETURN(rc);
3469 }
3470 EXPORT_SYMBOL(osc_setup_common);
3471
3472 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3473 {
3474         struct client_obd *cli = &obd->u.cli;
3475         int                adding;
3476         int                added;
3477         int                req_count;
3478         int                rc;
3479
3480         ENTRY;
3481
3482         rc = osc_setup_common(obd, lcfg);
3483         if (rc < 0)
3484                 RETURN(rc);
3485
3486         rc = osc_tunables_init(obd);
3487         if (rc)
3488                 RETURN(rc);
3489
3490         /*
3491          * We try to control the total number of requests with a upper limit
3492          * osc_reqpool_maxreqcount. There might be some race which will cause
3493          * over-limit allocation, but it is fine.
3494          */
3495         req_count = atomic_read(&osc_pool_req_count);
3496         if (req_count < osc_reqpool_maxreqcount) {
3497                 adding = cli->cl_max_rpcs_in_flight + 2;
3498                 if (req_count + adding > osc_reqpool_maxreqcount)
3499                         adding = osc_reqpool_maxreqcount - req_count;
3500
3501                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3502                 atomic_add(added, &osc_pool_req_count);
3503         }
3504
3505         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3506
3507         spin_lock(&osc_shrink_lock);
3508         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3509         spin_unlock(&osc_shrink_lock);
3510         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3511         cli->cl_import->imp_idle_debug = D_HA;
3512
3513         RETURN(0);
3514 }
3515
3516 int osc_precleanup_common(struct obd_device *obd)
3517 {
3518         struct client_obd *cli = &obd->u.cli;
3519         ENTRY;
3520
3521         /* LU-464
3522          * for echo client, export may be on zombie list, wait for
3523          * zombie thread to cull it, because cli.cl_import will be
3524          * cleared in client_disconnect_export():
3525          *   class_export_destroy() -> obd_cleanup() ->
3526          *   echo_device_free() -> echo_client_cleanup() ->
3527          *   obd_disconnect() -> osc_disconnect() ->
3528          *   client_disconnect_export()
3529          */
3530         obd_zombie_barrier();
3531         if (cli->cl_writeback_work) {
3532                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3533                 cli->cl_writeback_work = NULL;
3534         }
3535
3536         if (cli->cl_lru_work) {
3537                 ptlrpcd_destroy_work(cli->cl_lru_work);
3538                 cli->cl_lru_work = NULL;
3539         }
3540
3541         obd_cleanup_client_import(obd);
3542         RETURN(0);
3543 }
3544 EXPORT_SYMBOL(osc_precleanup_common);
3545
3546 static int osc_precleanup(struct obd_device *obd)
3547 {
3548         ENTRY;
3549
3550         osc_precleanup_common(obd);
3551
3552         ptlrpc_lprocfs_unregister_obd(obd);
3553         RETURN(0);
3554 }
3555
3556 int osc_cleanup_common(struct obd_device *obd)
3557 {
3558         struct client_obd *cli = &obd->u.cli;
3559         int rc;
3560
3561         ENTRY;
3562
3563         spin_lock(&osc_shrink_lock);
3564         list_del(&cli->cl_shrink_list);
3565         spin_unlock(&osc_shrink_lock);
3566
3567         /* lru cleanup */
3568         if (cli->cl_cache != NULL) {
3569                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3570                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3571                 list_del_init(&cli->cl_lru_osc);
3572                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3573                 cli->cl_lru_left = NULL;
3574                 cl_cache_decref(cli->cl_cache);
3575                 cli->cl_cache = NULL;
3576         }
3577
3578         /* free memory of osc quota cache */
3579         osc_quota_cleanup(obd);
3580
3581         rc = client_obd_cleanup(obd);
3582
3583         ptlrpcd_decref();
3584         RETURN(rc);
3585 }
3586 EXPORT_SYMBOL(osc_cleanup_common);
3587
3588 static const struct obd_ops osc_obd_ops = {
3589         .o_owner                = THIS_MODULE,
3590         .o_setup                = osc_setup,
3591         .o_precleanup           = osc_precleanup,
3592         .o_cleanup              = osc_cleanup_common,
3593         .o_add_conn             = client_import_add_conn,
3594         .o_del_conn             = client_import_del_conn,
3595         .o_connect              = client_connect_import,
3596         .o_reconnect            = osc_reconnect,
3597         .o_disconnect           = osc_disconnect,
3598         .o_statfs               = osc_statfs,
3599         .o_statfs_async         = osc_statfs_async,
3600         .o_create               = osc_create,
3601         .o_destroy              = osc_destroy,
3602         .o_getattr              = osc_getattr,
3603         .o_setattr              = osc_setattr,
3604         .o_iocontrol            = osc_iocontrol,
3605         .o_set_info_async       = osc_set_info_async,
3606         .o_import_event         = osc_import_event,
3607         .o_quotactl             = osc_quotactl,
3608 };
3609
3610 LIST_HEAD(osc_shrink_list);
3611 DEFINE_SPINLOCK(osc_shrink_lock);
3612
3613 #ifdef HAVE_SHRINKER_COUNT
3614 static struct shrinker osc_cache_shrinker = {
3615         .count_objects  = osc_cache_shrink_count,
3616         .scan_objects   = osc_cache_shrink_scan,
3617         .seeks          = DEFAULT_SEEKS,
3618 };
3619 #else
3620 static int osc_cache_shrink(struct shrinker *shrinker,
3621                             struct shrink_control *sc)
3622 {
3623         (void)osc_cache_shrink_scan(shrinker, sc);
3624
3625         return osc_cache_shrink_count(shrinker, sc);
3626 }
3627
3628 static struct shrinker osc_cache_shrinker = {
3629         .shrink   = osc_cache_shrink,
3630         .seeks    = DEFAULT_SEEKS,
3631 };
3632 #endif
3633
3634 static int __init osc_init(void)
3635 {
3636         unsigned int reqpool_size;
3637         unsigned int reqsize;
3638         int rc;
3639         ENTRY;
3640
3641         /* print an address of _any_ initialized kernel symbol from this
3642          * module, to allow debugging with gdb that doesn't support data
3643          * symbols from modules.*/
3644         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3645
3646         rc = lu_kmem_init(osc_caches);
3647         if (rc)
3648                 RETURN(rc);
3649
3650         rc = class_register_type(&osc_obd_ops, NULL, true,
3651                                  LUSTRE_OSC_NAME, &osc_device_type);
3652         if (rc)
3653                 GOTO(out_kmem, rc);
3654
3655         rc = register_shrinker(&osc_cache_shrinker);
3656         if (rc)
3657                 GOTO(out_type, rc);
3658
3659         /* This is obviously too much memory, only prevent overflow here */
3660         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3661                 GOTO(out_shrinker, rc = -EINVAL);
3662
3663         reqpool_size = osc_reqpool_mem_max << 20;
3664
3665         reqsize = 1;
3666         while (reqsize < OST_IO_MAXREQSIZE)
3667                 reqsize = reqsize << 1;
3668
3669         /*
3670          * We don't enlarge the request count in OSC pool according to
3671          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3672          * tried after normal allocation failed. So a small OSC pool won't
3673          * cause much performance degression in most of cases.
3674          */
3675         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3676
3677         atomic_set(&osc_pool_req_count, 0);
3678         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3679                                           ptlrpc_add_rqs_to_pool);
3680
3681         if (osc_rq_pool == NULL)
3682                 GOTO(out_shrinker, rc = -ENOMEM);
3683
3684         rc = osc_start_grant_work();
3685         if (rc != 0)
3686                 GOTO(out_req_pool, rc);
3687
3688         RETURN(rc);
3689
3690 out_req_pool:
3691         ptlrpc_free_rq_pool(osc_rq_pool);
3692 out_shrinker:
3693         unregister_shrinker(&osc_cache_shrinker);
3694 out_type:
3695         class_unregister_type(LUSTRE_OSC_NAME);
3696 out_kmem:
3697         lu_kmem_fini(osc_caches);
3698
3699         RETURN(rc);
3700 }
3701
3702 static void __exit osc_exit(void)
3703 {
3704         osc_stop_grant_work();
3705         unregister_shrinker(&osc_cache_shrinker);
3706         class_unregister_type(LUSTRE_OSC_NAME);
3707         lu_kmem_fini(osc_caches);
3708         ptlrpc_free_rq_pool(osc_rq_pool);
3709 }
3710
3711 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3712 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3713 MODULE_VERSION(LUSTRE_VERSION_STRING);
3714 MODULE_LICENSE("GPL");
3715
3716 module_init(osc_init);
3717 module_exit(osc_exit);