Whamcloud - gitweb
LU-14798 lustre: Support RDMA only pages
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
49
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
63
64 #define osc_grant_args osc_brw_async_args
65
66 struct osc_setattr_args {
67         struct obdo             *sa_oa;
68         obd_enqueue_update_f     sa_upcall;
69         void                    *sa_cookie;
70 };
71
72 struct osc_fsync_args {
73         struct osc_object       *fa_obj;
74         struct obdo             *fa_oa;
75         obd_enqueue_update_f    fa_upcall;
76         void                    *fa_cookie;
77 };
78
79 struct osc_ladvise_args {
80         struct obdo             *la_oa;
81         obd_enqueue_update_f     la_upcall;
82         void                    *la_cookie;
83 };
84
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87                          void *data, int rc);
88
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 {
91         struct ost_body *body;
92
93         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94         LASSERT(body);
95
96         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 }
98
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100                        struct obdo *oa)
101 {
102         struct ptlrpc_request   *req;
103         struct ost_body         *body;
104         int                      rc;
105
106         ENTRY;
107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
108         if (req == NULL)
109                 RETURN(-ENOMEM);
110
111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112         if (rc) {
113                 ptlrpc_request_free(req);
114                 RETURN(rc);
115         }
116
117         osc_pack_req_body(req, oa);
118
119         ptlrpc_request_set_replen(req);
120
121         rc = ptlrpc_queue_wait(req);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126         if (body == NULL)
127                 GOTO(out, rc = -EPROTO);
128
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131
132         oa->o_blksize = cli_brw_size(exp->exp_obd);
133         oa->o_valid |= OBD_MD_FLBLKSZ;
134
135         EXIT;
136 out:
137         ptlrpc_req_finished(req);
138
139         return rc;
140 }
141
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143                        struct obdo *oa)
144 {
145         struct ptlrpc_request   *req;
146         struct ost_body         *body;
147         int                      rc;
148
149         ENTRY;
150         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
153         if (req == NULL)
154                 RETURN(-ENOMEM);
155
156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157         if (rc) {
158                 ptlrpc_request_free(req);
159                 RETURN(rc);
160         }
161
162         osc_pack_req_body(req, oa);
163
164         ptlrpc_request_set_replen(req);
165
166         rc = ptlrpc_queue_wait(req);
167         if (rc)
168                 GOTO(out, rc);
169
170         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171         if (body == NULL)
172                 GOTO(out, rc = -EPROTO);
173
174         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
175
176         EXIT;
177 out:
178         ptlrpc_req_finished(req);
179
180         RETURN(rc);
181 }
182
183 static int osc_setattr_interpret(const struct lu_env *env,
184                                  struct ptlrpc_request *req, void *args, int rc)
185 {
186         struct osc_setattr_args *sa = args;
187         struct ost_body *body;
188
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199                              &body->oa);
200 out:
201         rc = sa->sa_upcall(sa->sa_cookie, rc);
202         RETURN(rc);
203 }
204
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206                       obd_enqueue_update_f upcall, void *cookie,
207                       struct ptlrpc_request_set *rqset)
208 {
209         struct ptlrpc_request   *req;
210         struct osc_setattr_args *sa;
211         int                      rc;
212
213         ENTRY;
214
215         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216         if (req == NULL)
217                 RETURN(-ENOMEM);
218
219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220         if (rc) {
221                 ptlrpc_request_free(req);
222                 RETURN(rc);
223         }
224
225         osc_pack_req_body(req, oa);
226
227         ptlrpc_request_set_replen(req);
228
229         /* do mds to ost setattr asynchronously */
230         if (!rqset) {
231                 /* Do not wait for response. */
232                 ptlrpcd_add_req(req);
233         } else {
234                 req->rq_interpret_reply = osc_setattr_interpret;
235
236                 sa = ptlrpc_req_async_args(sa, req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 ptlrpc_set_add_req(rqset, req);
242         }
243
244         RETURN(0);
245 }
246
247 static int osc_ladvise_interpret(const struct lu_env *env,
248                                  struct ptlrpc_request *req,
249                                  void *arg, int rc)
250 {
251         struct osc_ladvise_args *la = arg;
252         struct ost_body *body;
253         ENTRY;
254
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 GOTO(out, rc = -EPROTO);
261
262         *la->la_oa = body->oa;
263 out:
264         rc = la->la_upcall(la->la_cookie, rc);
265         RETURN(rc);
266 }
267
268 /**
269  * If rqset is NULL, do not wait for response. Upcall and cookie could also
270  * be NULL in this case
271  */
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273                      struct ladvise_hdr *ladvise_hdr,
274                      obd_enqueue_update_f upcall, void *cookie,
275                      struct ptlrpc_request_set *rqset)
276 {
277         struct ptlrpc_request   *req;
278         struct ost_body         *body;
279         struct osc_ladvise_args *la;
280         int                      rc;
281         struct lu_ladvise       *req_ladvise;
282         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
283         int                      num_advise = ladvise_hdr->lah_count;
284         struct ladvise_hdr      *req_ladvise_hdr;
285         ENTRY;
286
287         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
288         if (req == NULL)
289                 RETURN(-ENOMEM);
290
291         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292                              num_advise * sizeof(*ladvise));
293         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294         if (rc != 0) {
295                 ptlrpc_request_free(req);
296                 RETURN(rc);
297         }
298         req->rq_request_portal = OST_IO_PORTAL;
299         ptlrpc_at_set_req_timeout(req);
300
301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302         LASSERT(body);
303         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304                              oa);
305
306         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307                                                  &RMF_OST_LADVISE_HDR);
308         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309
310         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312         ptlrpc_request_set_replen(req);
313
314         if (rqset == NULL) {
315                 /* Do not wait for response. */
316                 ptlrpcd_add_req(req);
317                 RETURN(0);
318         }
319
320         req->rq_interpret_reply = osc_ladvise_interpret;
321         la = ptlrpc_req_async_args(la, req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         ptlrpc_set_add_req(rqset, req);
327
328         RETURN(0);
329 }
330
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332                       struct obdo *oa)
333 {
334         struct ptlrpc_request *req;
335         struct ost_body       *body;
336         int                    rc;
337         ENTRY;
338
339         LASSERT(oa != NULL);
340         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342
343         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344         if (req == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 GOTO(out, rc);
351         }
352
353         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354         LASSERT(body);
355
356         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 GOTO(out_req, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out_req, rc = -EPROTO);
367
368         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370
371         oa->o_blksize = cli_brw_size(exp->exp_obd);
372         oa->o_valid |= OBD_MD_FLBLKSZ;
373
374         CDEBUG(D_HA, "transno: %lld\n",
375                lustre_msg_get_transno(req->rq_repmsg));
376 out_req:
377         ptlrpc_req_finished(req);
378 out:
379         RETURN(rc);
380 }
381
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383                    obd_enqueue_update_f upcall, void *cookie)
384 {
385         struct ptlrpc_request *req;
386         struct osc_setattr_args *sa;
387         struct obd_import *imp = class_exp2cliimp(exp);
388         struct ost_body *body;
389         int rc;
390
391         ENTRY;
392
393         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
394         if (req == NULL)
395                 RETURN(-ENOMEM);
396
397         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398         if (rc < 0) {
399                 ptlrpc_request_free(req);
400                 RETURN(rc);
401         }
402
403         osc_set_io_portal(req);
404
405         ptlrpc_at_set_req_timeout(req);
406
407         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408
409         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410
411         ptlrpc_request_set_replen(req);
412
413         req->rq_interpret_reply = osc_setattr_interpret;
414         sa = ptlrpc_req_async_args(sa, req);
415         sa->sa_oa = oa;
416         sa->sa_upcall = upcall;
417         sa->sa_cookie = cookie;
418
419         ptlrpcd_add_req(req);
420
421         RETURN(0);
422 }
423 EXPORT_SYMBOL(osc_punch_send);
424
425 /**
426  * osc_fallocate_base() - Handles fallocate request.
427  *
428  * @exp:        Export structure
429  * @oa:         Attributes passed to OSS from client (obdo structure)
430  * @upcall:     Primary & supplementary group information
431  * @cookie:     Exclusive identifier
432  * @rqset:      Request list.
433  * @mode:       Operation done on given range.
434  *
435  * osc_fallocate_base() - Handles fallocate requests only. Only block
436  * allocation or standard preallocate operation is supported currently.
437  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438  * is supported via SETATTR request.
439  *
440  * Return: Non-zero on failure and O on success.
441  */
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443                        obd_enqueue_update_f upcall, void *cookie, int mode)
444 {
445         struct ptlrpc_request *req;
446         struct osc_setattr_args *sa;
447         struct ost_body *body;
448         struct obd_import *imp = class_exp2cliimp(exp);
449         int rc;
450         ENTRY;
451
452         oa->o_falloc_mode = mode;
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_OST_FALLOCATE);
455         if (req == NULL)
456                 RETURN(-ENOMEM);
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459         if (rc != 0) {
460                 ptlrpc_request_free(req);
461                 RETURN(rc);
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         req->rq_interpret_reply = osc_setattr_interpret;
472         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473         sa = ptlrpc_req_async_args(sa, req);
474         sa->sa_oa = oa;
475         sa->sa_upcall = upcall;
476         sa->sa_cookie = cookie;
477
478         ptlrpcd_add_req(req);
479
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(osc_fallocate_base);
483
484 static int osc_sync_interpret(const struct lu_env *env,
485                               struct ptlrpc_request *req, void *args, int rc)
486 {
487         struct osc_fsync_args *fa = args;
488         struct ost_body *body;
489         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490         unsigned long valid = 0;
491         struct cl_object *obj;
492         ENTRY;
493
494         if (rc != 0)
495                 GOTO(out, rc);
496
497         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498         if (body == NULL) {
499                 CERROR("can't unpack ost_body\n");
500                 GOTO(out, rc = -EPROTO);
501         }
502
503         *fa->fa_oa = body->oa;
504         obj = osc2cl(fa->fa_obj);
505
506         /* Update osc object's blocks attribute */
507         cl_object_attr_lock(obj);
508         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509                 attr->cat_blocks = body->oa.o_blocks;
510                 valid |= CAT_BLOCKS;
511         }
512
513         if (valid != 0)
514                 cl_object_attr_update(env, obj, attr, valid);
515         cl_object_attr_unlock(obj);
516
517 out:
518         rc = fa->fa_upcall(fa->fa_cookie, rc);
519         RETURN(rc);
520 }
521
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523                   obd_enqueue_update_f upcall, void *cookie,
524                   struct ptlrpc_request_set *rqset)
525 {
526         struct obd_export     *exp = osc_export(obj);
527         struct ptlrpc_request *req;
528         struct ost_body       *body;
529         struct osc_fsync_args *fa;
530         int                    rc;
531         ENTRY;
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549         req->rq_interpret_reply = osc_sync_interpret;
550
551         fa = ptlrpc_req_async_args(fa, req);
552         fa->fa_obj = obj;
553         fa->fa_oa = oa;
554         fa->fa_upcall = upcall;
555         fa->fa_cookie = cookie;
556
557         ptlrpc_set_add_req(rqset, req);
558
559         RETURN (0);
560 }
561
562 /* Find and cancel locally locks matched by @mode in the resource found by
563  * @objid. Found locks are added into @cancel list. Returns the amount of
564  * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566                                    struct list_head *cancels,
567                                    enum ldlm_mode mode, __u64 lock_flags)
568 {
569         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570         struct ldlm_res_id res_id;
571         struct ldlm_resource *res;
572         int count;
573         ENTRY;
574
575         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576          * export) but disabled through procfs (flag in NS).
577          *
578          * This distinguishes from a case when ELC is not supported originally,
579          * when we still want to cancel locks in advance and just cancel them
580          * locally, without sending any RPC. */
581         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
582                 RETURN(0);
583
584         ostid_build_res_name(&oa->o_oi, &res_id);
585         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
586         if (IS_ERR(res))
587                 RETURN(0);
588
589         LDLM_RESOURCE_ADDREF(res);
590         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591                                            lock_flags, 0, NULL);
592         LDLM_RESOURCE_DELREF(res);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(const struct lu_env *env,
598                                  struct ptlrpc_request *req, void *args, int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         wake_up(&cli->cl_destroy_waitq);
604
605         return 0;
606 }
607
608 static int osc_can_send_destroy(struct client_obd *cli)
609 {
610         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611             cli->cl_max_rpcs_in_flight) {
612                 /* The destroy request can be sent */
613                 return 1;
614         }
615         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616             cli->cl_max_rpcs_in_flight) {
617                 /*
618                  * The counter has been modified between the two atomic
619                  * operations.
620                  */
621                 wake_up(&cli->cl_destroy_waitq);
622         }
623         return 0;
624 }
625
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
627                        struct obdo *oa)
628 {
629         struct client_obd     *cli = &exp->exp_obd->u.cli;
630         struct ptlrpc_request *req;
631         struct ost_body       *body;
632         LIST_HEAD(cancels);
633         int rc, count;
634         ENTRY;
635
636         if (!oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642                                         LDLM_FL_DISCARD_DATA);
643
644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645         if (req == NULL) {
646                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
647                 RETURN(-ENOMEM);
648         }
649
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661         LASSERT(body);
662         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663
664         ptlrpc_request_set_replen(req);
665
666         req->rq_interpret_reply = osc_destroy_interpret;
667         if (!osc_can_send_destroy(cli)) {
668                 /*
669                  * Wait until the number of on-going destroy RPCs drops
670                  * under max_rpc_in_flight
671                  */
672                 rc = l_wait_event_abortable_exclusive(
673                         cli->cl_destroy_waitq,
674                         osc_can_send_destroy(cli));
675                 if (rc) {
676                         ptlrpc_req_finished(req);
677                         RETURN(-EINTR);
678                 }
679         }
680
681         /* Do not wait for response */
682         ptlrpcd_add_req(req);
683         RETURN(0);
684 }
685
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
687                                 long writing_bytes)
688 {
689         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690
691         LASSERT(!(oa->o_valid & bits));
692
693         oa->o_valid |= bits;
694         spin_lock(&cli->cl_loi_list_lock);
695         if (cli->cl_ocd_grant_param)
696                 oa->o_dirty = cli->cl_dirty_grant;
697         else
698                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700                 CERROR("dirty %lu > dirty_max %lu\n",
701                        cli->cl_dirty_pages,
702                        cli->cl_dirty_max_pages);
703                 oa->o_undirty = 0;
704         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705                             (long)(obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
711                        obd_max_dirty_pages);
712                 oa->o_undirty = 0;
713         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714                             0x7fffffff)) {
715                 CERROR("dirty %lu - dirty_max %lu too big???\n",
716                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
717                 oa->o_undirty = 0;
718         } else {
719                 unsigned long nrpages;
720                 unsigned long undirty;
721
722                 nrpages = cli->cl_max_pages_per_rpc;
723                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725                 undirty = nrpages << PAGE_SHIFT;
726                 if (cli->cl_ocd_grant_param) {
727                         int nrextents;
728
729                         /* take extent tax into account when asking for more
730                          * grant space */
731                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732                                      cli->cl_max_extent_pages;
733                         undirty += nrextents * cli->cl_grant_extent_tax;
734                 }
735                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736                  * to add extent tax, etc.
737                  */
738                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740         }
741         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743         if (cli->cl_lost_grant > INT_MAX) {
744                 CDEBUG(D_CACHE,
745                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746                       cli_name(cli), cli->cl_lost_grant);
747                 oa->o_dropped = INT_MAX;
748         } else {
749                 oa->o_dropped = cli->cl_lost_grant;
750         }
751         cli->cl_lost_grant -= oa->o_dropped;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
756 }
757
758 void osc_update_next_shrink(struct client_obd *cli)
759 {
760         cli->cl_next_shrink_grant = ktime_get_seconds() +
761                                     cli->cl_grant_shrink_interval;
762
763         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764                cli->cl_next_shrink_grant);
765 }
766
767 static void __osc_update_grant(struct client_obd *cli, u64 grant)
768 {
769         spin_lock(&cli->cl_loi_list_lock);
770         cli->cl_avail_grant += grant;
771         spin_unlock(&cli->cl_loi_list_lock);
772 }
773
774 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
775 {
776         if (body->oa.o_valid & OBD_MD_FLGRANT) {
777                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
778                 __osc_update_grant(cli, body->oa.o_grant);
779         }
780 }
781
782 /**
783  * grant thread data for shrinking space.
784  */
785 struct grant_thread_data {
786         struct list_head        gtd_clients;
787         struct mutex            gtd_mutex;
788         unsigned long           gtd_stopped:1;
789 };
790 static struct grant_thread_data client_gtd;
791
792 static int osc_shrink_grant_interpret(const struct lu_env *env,
793                                       struct ptlrpc_request *req,
794                                       void *args, int rc)
795 {
796         struct osc_grant_args *aa = args;
797         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
798         struct ost_body *body;
799
800         if (rc != 0) {
801                 __osc_update_grant(cli, aa->aa_oa->o_grant);
802                 GOTO(out, rc);
803         }
804
805         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
806         LASSERT(body);
807         osc_update_grant(cli, body);
808 out:
809         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
810         aa->aa_oa = NULL;
811
812         return rc;
813 }
814
815 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
816 {
817         spin_lock(&cli->cl_loi_list_lock);
818         oa->o_grant = cli->cl_avail_grant / 4;
819         cli->cl_avail_grant -= oa->o_grant;
820         spin_unlock(&cli->cl_loi_list_lock);
821         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
822                 oa->o_valid |= OBD_MD_FLFLAGS;
823                 oa->o_flags = 0;
824         }
825         oa->o_flags |= OBD_FL_SHRINK_GRANT;
826         osc_update_next_shrink(cli);
827 }
828
829 /* Shrink the current grant, either from some large amount to enough for a
830  * full set of in-flight RPCs, or if we have already shrunk to that limit
831  * then to enough for a single RPC.  This avoids keeping more grant than
832  * needed, and avoids shrinking the grant piecemeal. */
833 static int osc_shrink_grant(struct client_obd *cli)
834 {
835         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
836                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
837
838         spin_lock(&cli->cl_loi_list_lock);
839         if (cli->cl_avail_grant <= target_bytes)
840                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
841         spin_unlock(&cli->cl_loi_list_lock);
842
843         return osc_shrink_grant_to_target(cli, target_bytes);
844 }
845
846 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
847 {
848         int                     rc = 0;
849         struct ost_body        *body;
850         ENTRY;
851
852         spin_lock(&cli->cl_loi_list_lock);
853         /* Don't shrink if we are already above or below the desired limit
854          * We don't want to shrink below a single RPC, as that will negatively
855          * impact block allocation and long-term performance. */
856         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
857                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
858
859         if (target_bytes >= cli->cl_avail_grant) {
860                 spin_unlock(&cli->cl_loi_list_lock);
861                 RETURN(0);
862         }
863         spin_unlock(&cli->cl_loi_list_lock);
864
865         OBD_ALLOC_PTR(body);
866         if (!body)
867                 RETURN(-ENOMEM);
868
869         osc_announce_cached(cli, &body->oa, 0);
870
871         spin_lock(&cli->cl_loi_list_lock);
872         if (target_bytes >= cli->cl_avail_grant) {
873                 /* available grant has changed since target calculation */
874                 spin_unlock(&cli->cl_loi_list_lock);
875                 GOTO(out_free, rc = 0);
876         }
877         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
878         cli->cl_avail_grant = target_bytes;
879         spin_unlock(&cli->cl_loi_list_lock);
880         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
881                 body->oa.o_valid |= OBD_MD_FLFLAGS;
882                 body->oa.o_flags = 0;
883         }
884         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
885         osc_update_next_shrink(cli);
886
887         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
888                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
889                                 sizeof(*body), body, NULL);
890         if (rc != 0)
891                 __osc_update_grant(cli, body->oa.o_grant);
892 out_free:
893         OBD_FREE_PTR(body);
894         RETURN(rc);
895 }
896
897 static int osc_should_shrink_grant(struct client_obd *client)
898 {
899         time64_t next_shrink = client->cl_next_shrink_grant;
900
901         if (client->cl_import == NULL)
902                 return 0;
903
904         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
905             client->cl_import->imp_grant_shrink_disabled) {
906                 osc_update_next_shrink(client);
907                 return 0;
908         }
909
910         if (ktime_get_seconds() >= next_shrink - 5) {
911                 /* Get the current RPC size directly, instead of going via:
912                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
913                  * Keep comment here so that it can be found by searching. */
914                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
915
916                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
917                     client->cl_avail_grant > brw_size)
918                         return 1;
919                 else
920                         osc_update_next_shrink(client);
921         }
922         return 0;
923 }
924
925 #define GRANT_SHRINK_RPC_BATCH  100
926
927 static struct delayed_work work;
928
929 static void osc_grant_work_handler(struct work_struct *data)
930 {
931         struct client_obd *cli;
932         int rpc_sent;
933         bool init_next_shrink = true;
934         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
935
936         rpc_sent = 0;
937         mutex_lock(&client_gtd.gtd_mutex);
938         list_for_each_entry(cli, &client_gtd.gtd_clients,
939                             cl_grant_chain) {
940                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
941                     osc_should_shrink_grant(cli)) {
942                         osc_shrink_grant(cli);
943                         rpc_sent++;
944                 }
945
946                 if (!init_next_shrink) {
947                         if (cli->cl_next_shrink_grant < next_shrink &&
948                             cli->cl_next_shrink_grant > ktime_get_seconds())
949                                 next_shrink = cli->cl_next_shrink_grant;
950                 } else {
951                         init_next_shrink = false;
952                         next_shrink = cli->cl_next_shrink_grant;
953                 }
954         }
955         mutex_unlock(&client_gtd.gtd_mutex);
956
957         if (client_gtd.gtd_stopped == 1)
958                 return;
959
960         if (next_shrink > ktime_get_seconds()) {
961                 time64_t delay = next_shrink - ktime_get_seconds();
962
963                 schedule_delayed_work(&work, cfs_time_seconds(delay));
964         } else {
965                 schedule_work(&work.work);
966         }
967 }
968
969 void osc_schedule_grant_work(void)
970 {
971         cancel_delayed_work_sync(&work);
972         schedule_work(&work.work);
973 }
974
975 /**
976  * Start grant thread for returing grant to server for idle clients.
977  */
978 static int osc_start_grant_work(void)
979 {
980         client_gtd.gtd_stopped = 0;
981         mutex_init(&client_gtd.gtd_mutex);
982         INIT_LIST_HEAD(&client_gtd.gtd_clients);
983
984         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
985         schedule_work(&work.work);
986
987         return 0;
988 }
989
990 static void osc_stop_grant_work(void)
991 {
992         client_gtd.gtd_stopped = 1;
993         cancel_delayed_work_sync(&work);
994 }
995
996 static void osc_add_grant_list(struct client_obd *client)
997 {
998         mutex_lock(&client_gtd.gtd_mutex);
999         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1000         mutex_unlock(&client_gtd.gtd_mutex);
1001 }
1002
1003 static void osc_del_grant_list(struct client_obd *client)
1004 {
1005         if (list_empty(&client->cl_grant_chain))
1006                 return;
1007
1008         mutex_lock(&client_gtd.gtd_mutex);
1009         list_del_init(&client->cl_grant_chain);
1010         mutex_unlock(&client_gtd.gtd_mutex);
1011 }
1012
1013 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1014 {
1015         /*
1016          * ocd_grant is the total grant amount we're expect to hold: if we've
1017          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1018          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1019          * dirty.
1020          *
1021          * race is tolerable here: if we're evicted, but imp_state already
1022          * left EVICTED state, then cl_dirty_pages must be 0 already.
1023          */
1024         spin_lock(&cli->cl_loi_list_lock);
1025         cli->cl_avail_grant = ocd->ocd_grant;
1026         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1027                 unsigned long consumed = cli->cl_reserved_grant;
1028
1029                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1030                         consumed += cli->cl_dirty_grant;
1031                 else
1032                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1033                 if (cli->cl_avail_grant < consumed) {
1034                         CERROR("%s: granted %ld but already consumed %ld\n",
1035                                cli_name(cli), cli->cl_avail_grant, consumed);
1036                         cli->cl_avail_grant = 0;
1037                 } else {
1038                         cli->cl_avail_grant -= consumed;
1039                 }
1040         }
1041
1042         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1043                 u64 size;
1044                 int chunk_mask;
1045
1046                 /* overhead for each extent insertion */
1047                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1048                 /* determine the appropriate chunk size used by osc_extent. */
1049                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1050                                           ocd->ocd_grant_blkbits);
1051                 /* max_pages_per_rpc must be chunk aligned */
1052                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1053                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1054                                              ~chunk_mask) & chunk_mask;
1055                 /* determine maximum extent size, in #pages */
1056                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1057                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1058                 cli->cl_ocd_grant_param = 1;
1059         } else {
1060                 cli->cl_ocd_grant_param = 0;
1061                 cli->cl_grant_extent_tax = 0;
1062                 cli->cl_chunkbits = PAGE_SHIFT;
1063                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1064         }
1065         spin_unlock(&cli->cl_loi_list_lock);
1066
1067         CDEBUG(D_CACHE,
1068                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1069                cli_name(cli),
1070                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1071                cli->cl_max_extent_pages);
1072
1073         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1074                 osc_add_grant_list(cli);
1075 }
1076 EXPORT_SYMBOL(osc_init_grant);
1077
1078 /* We assume that the reason this OSC got a short read is because it read
1079  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1080  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1081  * this stripe never got written at or beyond this stripe offset yet. */
1082 static void handle_short_read(int nob_read, size_t page_count,
1083                               struct brw_page **pga)
1084 {
1085         char *ptr;
1086         int i = 0;
1087
1088         /* skip bytes read OK */
1089         while (nob_read > 0) {
1090                 LASSERT (page_count > 0);
1091
1092                 if (pga[i]->count > nob_read) {
1093                         /* EOF inside this page */
1094                         ptr = kmap(pga[i]->pg) +
1095                                 (pga[i]->off & ~PAGE_MASK);
1096                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1097                         kunmap(pga[i]->pg);
1098                         page_count--;
1099                         i++;
1100                         break;
1101                 }
1102
1103                 nob_read -= pga[i]->count;
1104                 page_count--;
1105                 i++;
1106         }
1107
1108         /* zero remaining pages */
1109         while (page_count-- > 0) {
1110                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1111                 memset(ptr, 0, pga[i]->count);
1112                 kunmap(pga[i]->pg);
1113                 i++;
1114         }
1115 }
1116
1117 static int check_write_rcs(struct ptlrpc_request *req,
1118                            int requested_nob, int niocount,
1119                            size_t page_count, struct brw_page **pga)
1120 {
1121         int     i;
1122         __u32   *remote_rcs;
1123
1124         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1125                                                   sizeof(*remote_rcs) *
1126                                                   niocount);
1127         if (remote_rcs == NULL) {
1128                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1129                 return(-EPROTO);
1130         }
1131
1132         /* return error if any niobuf was in error */
1133         for (i = 0; i < niocount; i++) {
1134                 if ((int)remote_rcs[i] < 0) {
1135                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1136                                i, remote_rcs[i], req);
1137                         return remote_rcs[i];
1138                 }
1139
1140                 if (remote_rcs[i] != 0) {
1141                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1142                                 i, remote_rcs[i], req);
1143                         return(-EPROTO);
1144                 }
1145         }
1146         if (req->rq_bulk != NULL &&
1147             req->rq_bulk->bd_nob_transferred != requested_nob) {
1148                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1149                        req->rq_bulk->bd_nob_transferred, requested_nob);
1150                 return(-EPROTO);
1151         }
1152
1153         return (0);
1154 }
1155
1156 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1157 {
1158         if (p1->flag != p2->flag) {
1159                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1160                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1161                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1162
1163                 /* warn if we try to combine flags that we don't know to be
1164                  * safe to combine */
1165                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1166                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1167                               "report this at https://jira.whamcloud.com/\n",
1168                               p1->flag, p2->flag);
1169                 }
1170                 return 0;
1171         }
1172
1173         return (p1->off + p1->count == p2->off);
1174 }
1175
1176 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1177 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1178                                    size_t pg_count, struct brw_page **pga,
1179                                    int opc, obd_dif_csum_fn *fn,
1180                                    int sector_size,
1181                                    u32 *check_sum)
1182 {
1183         struct ahash_request *req;
1184         /* Used Adler as the default checksum type on top of DIF tags */
1185         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1186         struct page *__page;
1187         unsigned char *buffer;
1188         __u16 *guard_start;
1189         unsigned int bufsize;
1190         int guard_number;
1191         int used_number = 0;
1192         int used;
1193         u32 cksum;
1194         int rc = 0;
1195         int i = 0;
1196
1197         LASSERT(pg_count > 0);
1198
1199         __page = alloc_page(GFP_KERNEL);
1200         if (__page == NULL)
1201                 return -ENOMEM;
1202
1203         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1204         if (IS_ERR(req)) {
1205                 rc = PTR_ERR(req);
1206                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1207                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1208                 GOTO(out, rc);
1209         }
1210
1211         buffer = kmap(__page);
1212         guard_start = (__u16 *)buffer;
1213         guard_number = PAGE_SIZE / sizeof(*guard_start);
1214         while (nob > 0 && pg_count > 0) {
1215                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1216
1217                 /* corrupt the data before we compute the checksum, to
1218                  * simulate an OST->client data error */
1219                 if (unlikely(i == 0 && opc == OST_READ &&
1220                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1221                         unsigned char *ptr = kmap(pga[i]->pg);
1222                         int off = pga[i]->off & ~PAGE_MASK;
1223
1224                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1225                         kunmap(pga[i]->pg);
1226                 }
1227
1228                 /*
1229                  * The left guard number should be able to hold checksums of a
1230                  * whole page
1231                  */
1232                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1233                                                   pga[i]->off & ~PAGE_MASK,
1234                                                   count,
1235                                                   guard_start + used_number,
1236                                                   guard_number - used_number,
1237                                                   &used, sector_size,
1238                                                   fn);
1239                 if (rc)
1240                         break;
1241
1242                 used_number += used;
1243                 if (used_number == guard_number) {
1244                         cfs_crypto_hash_update_page(req, __page, 0,
1245                                 used_number * sizeof(*guard_start));
1246                         used_number = 0;
1247                 }
1248
1249                 nob -= pga[i]->count;
1250                 pg_count--;
1251                 i++;
1252         }
1253         kunmap(__page);
1254         if (rc)
1255                 GOTO(out, rc);
1256
1257         if (used_number != 0)
1258                 cfs_crypto_hash_update_page(req, __page, 0,
1259                         used_number * sizeof(*guard_start));
1260
1261         bufsize = sizeof(cksum);
1262         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1263
1264         /* For sending we only compute the wrong checksum instead
1265          * of corrupting the data so it is still correct on a redo */
1266         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267                 cksum++;
1268
1269         *check_sum = cksum;
1270 out:
1271         __free_page(__page);
1272         return rc;
1273 }
1274 #else /* !CONFIG_CRC_T10DIF */
1275 #define obd_dif_ip_fn NULL
1276 #define obd_dif_crc_fn NULL
1277 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1278         -EOPNOTSUPP
1279 #endif /* CONFIG_CRC_T10DIF */
1280
1281 static int osc_checksum_bulk(int nob, size_t pg_count,
1282                              struct brw_page **pga, int opc,
1283                              enum cksum_types cksum_type,
1284                              u32 *cksum)
1285 {
1286         int                             i = 0;
1287         struct ahash_request           *req;
1288         unsigned int                    bufsize;
1289         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1290
1291         LASSERT(pg_count > 0);
1292
1293         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1294         if (IS_ERR(req)) {
1295                 CERROR("Unable to initialize checksum hash %s\n",
1296                        cfs_crypto_hash_name(cfs_alg));
1297                 return PTR_ERR(req);
1298         }
1299
1300         while (nob > 0 && pg_count > 0) {
1301                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1302
1303                 /* corrupt the data before we compute the checksum, to
1304                  * simulate an OST->client data error */
1305                 if (i == 0 && opc == OST_READ &&
1306                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1307                         unsigned char *ptr = kmap(pga[i]->pg);
1308                         int off = pga[i]->off & ~PAGE_MASK;
1309
1310                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1311                         kunmap(pga[i]->pg);
1312                 }
1313                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1314                                             pga[i]->off & ~PAGE_MASK,
1315                                             count);
1316                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1317                                (int)(pga[i]->off & ~PAGE_MASK));
1318
1319                 nob -= pga[i]->count;
1320                 pg_count--;
1321                 i++;
1322         }
1323
1324         bufsize = sizeof(*cksum);
1325         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1326
1327         /* For sending we only compute the wrong checksum instead
1328          * of corrupting the data so it is still correct on a redo */
1329         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1330                 (*cksum)++;
1331
1332         return 0;
1333 }
1334
1335 static int osc_checksum_bulk_rw(const char *obd_name,
1336                                 enum cksum_types cksum_type,
1337                                 int nob, size_t pg_count,
1338                                 struct brw_page **pga, int opc,
1339                                 u32 *check_sum)
1340 {
1341         obd_dif_csum_fn *fn = NULL;
1342         int sector_size = 0;
1343         int rc;
1344
1345         ENTRY;
1346         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1347
1348         if (fn)
1349                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1350                                              opc, fn, sector_size, check_sum);
1351         else
1352                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1353                                        check_sum);
1354
1355         RETURN(rc);
1356 }
1357
1358 static inline void osc_release_bounce_pages(struct brw_page **pga,
1359                                             u32 page_count)
1360 {
1361 #ifdef HAVE_LUSTRE_CRYPTO
1362         int i;
1363
1364         for (i = 0; i < page_count; i++) {
1365                 /* Bounce pages allocated by a call to
1366                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1367                  * are identified thanks to the PageChecked flag.
1368                  */
1369                 if (PageChecked(pga[i]->pg))
1370                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1371                 pga[i]->count -= pga[i]->bp_count_diff;
1372                 pga[i]->off += pga[i]->bp_off_diff;
1373         }
1374 #endif
1375 }
1376
1377 static int
1378 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1379                      u32 page_count, struct brw_page **pga,
1380                      struct ptlrpc_request **reqp, int resend)
1381 {
1382         struct ptlrpc_request *req;
1383         struct ptlrpc_bulk_desc *desc;
1384         struct ost_body *body;
1385         struct obd_ioobj *ioobj;
1386         struct niobuf_remote *niobuf;
1387         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1388         struct osc_brw_async_args *aa;
1389         struct req_capsule *pill;
1390         struct brw_page *pg_prev;
1391         void *short_io_buf;
1392         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1393         struct inode *inode = NULL;
1394         bool directio = false;
1395         bool enable_checksum = true;
1396
1397         ENTRY;
1398         if (pga[0]->pg) {
1399                 inode = page2inode(pga[0]->pg);
1400                 if (inode == NULL) {
1401                         /* Try to get reference to inode from cl_page if we are
1402                          * dealing with direct IO, as handled pages are not
1403                          * actual page cache pages.
1404                          */
1405                         struct osc_async_page *oap = brw_page2oap(pga[0]);
1406                         struct cl_page *clpage = oap2cl_page(oap);
1407
1408                         inode = clpage->cp_inode;
1409                         if (inode)
1410                                 directio = true;
1411                 }
1412         }
1413         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1414                 RETURN(-ENOMEM); /* Recoverable */
1415         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1416                 RETURN(-EINVAL); /* Fatal */
1417
1418         if ((cmd & OBD_BRW_WRITE) != 0) {
1419                 opc = OST_WRITE;
1420                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1421                                                 osc_rq_pool,
1422                                                 &RQF_OST_BRW_WRITE);
1423         } else {
1424                 opc = OST_READ;
1425                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1426         }
1427         if (req == NULL)
1428                 RETURN(-ENOMEM);
1429
1430         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1431                 for (i = 0; i < page_count; i++) {
1432                         struct brw_page *pg = pga[i];
1433                         struct page *data_page = NULL;
1434                         bool retried = false;
1435                         bool lockedbymyself;
1436                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1437                         struct address_space *map_orig = NULL;
1438                         pgoff_t index_orig;
1439
1440 retry_encrypt:
1441                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1442                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1443                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1444                         /* The page can already be locked when we arrive here.
1445                          * This is possible when cl_page_assume/vvp_page_assume
1446                          * is stuck on wait_on_page_writeback with page lock
1447                          * held. In this case there is no risk for the lock to
1448                          * be released while we are doing our encryption
1449                          * processing, because writeback against that page will
1450                          * end in vvp_page_completion_write/cl_page_completion,
1451                          * which means only once the page is fully processed.
1452                          */
1453                         lockedbymyself = trylock_page(pg->pg);
1454                         if (directio) {
1455                                 map_orig = pg->pg->mapping;
1456                                 pg->pg->mapping = inode->i_mapping;
1457                                 index_orig = pg->pg->index;
1458                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1459                         }
1460                         data_page =
1461                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1462                                                                  nunits, 0,
1463                                                                  GFP_NOFS);
1464                         if (directio) {
1465                                 pg->pg->mapping = map_orig;
1466                                 pg->pg->index = index_orig;
1467                         }
1468                         if (lockedbymyself)
1469                                 unlock_page(pg->pg);
1470                         if (IS_ERR(data_page)) {
1471                                 rc = PTR_ERR(data_page);
1472                                 if (rc == -ENOMEM && !retried) {
1473                                         retried = true;
1474                                         rc = 0;
1475                                         goto retry_encrypt;
1476                                 }
1477                                 ptlrpc_request_free(req);
1478                                 RETURN(rc);
1479                         }
1480                         /* Set PageChecked flag on bounce page for
1481                          * disambiguation in osc_release_bounce_pages().
1482                          */
1483                         SetPageChecked(data_page);
1484                         pg->pg = data_page;
1485                         /* there should be no gap in the middle of page array */
1486                         if (i == page_count - 1) {
1487                                 struct osc_async_page *oap = brw_page2oap(pg);
1488
1489                                 oa->o_size = oap->oap_count +
1490                                         oap->oap_obj_off + oap->oap_page_off;
1491                         }
1492                         /* len is forced to nunits, and relative offset to 0
1493                          * so store the old, clear text info
1494                          */
1495                         pg->bp_count_diff = nunits - pg->count;
1496                         pg->count = nunits;
1497                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1498                         pg->off = pg->off & PAGE_MASK;
1499                 }
1500         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1501                 for (i = 0; i < page_count; i++) {
1502                         struct brw_page *pg = pga[i];
1503                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1504
1505                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1506                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1507                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1508                         /* count/off are forced to cover the whole encryption
1509                          * unit size so that all encrypted data is stored on the
1510                          * OST, so adjust bp_{count,off}_diff for the size of
1511                          * the clear text.
1512                          */
1513                         pg->bp_count_diff = nunits - pg->count;
1514                         pg->count = nunits;
1515                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1516                         pg->off = pg->off & PAGE_MASK;
1517                 }
1518         }
1519
1520         for (niocount = i = 1; i < page_count; i++) {
1521                 if (!can_merge_pages(pga[i - 1], pga[i]))
1522                         niocount++;
1523         }
1524
1525         pill = &req->rq_pill;
1526         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1527                              sizeof(*ioobj));
1528         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1529                              niocount * sizeof(*niobuf));
1530
1531         for (i = 0; i < page_count; i++) {
1532                 short_io_size += pga[i]->count;
1533                 if (!inode || !IS_ENCRYPTED(inode)) {
1534                         pga[i]->bp_count_diff = 0;
1535                         pga[i]->bp_off_diff = 0;
1536                 }
1537         }
1538
1539         if (lnet_is_rdma_only_page(pga[0]->pg)) {
1540                 enable_checksum = false;
1541                 short_io_size = 0;
1542         }
1543
1544         /* Check if read/write is small enough to be a short io. */
1545         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1546             !imp_connect_shortio(cli->cl_import))
1547                 short_io_size = 0;
1548
1549         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1550                              opc == OST_READ ? 0 : short_io_size);
1551         if (opc == OST_READ)
1552                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1553                                      short_io_size);
1554
1555         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1556         if (rc) {
1557                 ptlrpc_request_free(req);
1558                 RETURN(rc);
1559         }
1560         osc_set_io_portal(req);
1561
1562         ptlrpc_at_set_req_timeout(req);
1563         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1564          * retry logic */
1565         req->rq_no_retry_einprogress = 1;
1566
1567         if (short_io_size != 0) {
1568                 desc = NULL;
1569                 short_io_buf = NULL;
1570                 goto no_bulk;
1571         }
1572
1573         desc = ptlrpc_prep_bulk_imp(req, page_count,
1574                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1575                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1576                         PTLRPC_BULK_PUT_SINK),
1577                 OST_BULK_PORTAL,
1578                 &ptlrpc_bulk_kiov_pin_ops);
1579
1580         if (desc == NULL)
1581                 GOTO(out, rc = -ENOMEM);
1582         /* NB request now owns desc and will free it when it gets freed */
1583 no_bulk:
1584         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1585         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1586         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1587         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1588
1589         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1590
1591         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1592          * and from_kgid(), because they are asynchronous. Fortunately, variable
1593          * oa contains valid o_uid and o_gid in these two operations.
1594          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1595          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1596          * other process logic */
1597         body->oa.o_uid = oa->o_uid;
1598         body->oa.o_gid = oa->o_gid;
1599
1600         obdo_to_ioobj(oa, ioobj);
1601         ioobj->ioo_bufcnt = niocount;
1602         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1603          * that might be send for this request.  The actual number is decided
1604          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1605          * "max - 1" for old client compatibility sending "0", and also so the
1606          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1607         if (desc != NULL)
1608                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1609         else /* short io */
1610                 ioobj_max_brw_set(ioobj, 0);
1611
1612         if (short_io_size != 0) {
1613                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1614                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1615                         body->oa.o_flags = 0;
1616                 }
1617                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1618                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1619                        short_io_size);
1620                 if (opc == OST_WRITE) {
1621                         short_io_buf = req_capsule_client_get(pill,
1622                                                               &RMF_SHORT_IO);
1623                         LASSERT(short_io_buf != NULL);
1624                 }
1625         }
1626
1627         LASSERT(page_count > 0);
1628         pg_prev = pga[0];
1629         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1630                 struct brw_page *pg = pga[i];
1631                 int poff = pg->off & ~PAGE_MASK;
1632
1633                 LASSERT(pg->count > 0);
1634                 /* make sure there is no gap in the middle of page array */
1635                 LASSERTF(page_count == 1 ||
1636                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1637                           ergo(i > 0 && i < page_count - 1,
1638                                poff == 0 && pg->count == PAGE_SIZE)   &&
1639                           ergo(i == page_count - 1, poff == 0)),
1640                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1641                          i, page_count, pg, pg->off, pg->count);
1642                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1643                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1644                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1645                          i, page_count,
1646                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1647                          pg_prev->pg, page_private(pg_prev->pg),
1648                          pg_prev->pg->index, pg_prev->off);
1649                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1650                         (pg->flag & OBD_BRW_SRVLOCK));
1651                 if (short_io_size != 0 && opc == OST_WRITE) {
1652                         unsigned char *ptr = kmap_atomic(pg->pg);
1653
1654                         LASSERT(short_io_size >= requested_nob + pg->count);
1655                         memcpy(short_io_buf + requested_nob,
1656                                ptr + poff,
1657                                pg->count);
1658                         kunmap_atomic(ptr);
1659                 } else if (short_io_size == 0) {
1660                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1661                                                          pg->count);
1662                 }
1663                 requested_nob += pg->count;
1664
1665                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1666                         niobuf--;
1667                         niobuf->rnb_len += pg->count;
1668                 } else {
1669                         niobuf->rnb_offset = pg->off;
1670                         niobuf->rnb_len    = pg->count;
1671                         niobuf->rnb_flags  = pg->flag;
1672                 }
1673                 pg_prev = pg;
1674         }
1675
1676         LASSERTF((void *)(niobuf - niocount) ==
1677                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1678                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1679                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1680
1681         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1682         if (resend) {
1683                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1684                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1685                         body->oa.o_flags = 0;
1686                 }
1687                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1688         }
1689
1690         if (osc_should_shrink_grant(cli))
1691                 osc_shrink_grant_local(cli, &body->oa);
1692
1693         if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1694                 enable_checksum = false;
1695
1696         /* size[REQ_REC_OFF] still sizeof (*body) */
1697         if (opc == OST_WRITE) {
1698                 if (enable_checksum) {
1699                         /* store cl_cksum_type in a local variable since
1700                          * it can be changed via lprocfs */
1701                         enum cksum_types cksum_type = cli->cl_cksum_type;
1702
1703                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1704                                 body->oa.o_flags = 0;
1705
1706                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1707                                                                 cksum_type);
1708                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1709
1710                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1711                                                   requested_nob, page_count,
1712                                                   pga, OST_WRITE,
1713                                                   &body->oa.o_cksum);
1714                         if (rc < 0) {
1715                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1716                                        rc);
1717                                 GOTO(out, rc);
1718                         }
1719                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1720                                body->oa.o_cksum);
1721
1722                         /* save this in 'oa', too, for later checking */
1723                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1724                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1725                                                            cksum_type);
1726                 } else {
1727                         /* clear out the checksum flag, in case this is a
1728                          * resend but cl_checksum is no longer set. b=11238 */
1729                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1730                 }
1731                 oa->o_cksum = body->oa.o_cksum;
1732                 /* 1 RC per niobuf */
1733                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1734                                      sizeof(__u32) * niocount);
1735         } else {
1736                 if (enable_checksum) {
1737                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1738                                 body->oa.o_flags = 0;
1739                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1740                                 cli->cl_cksum_type);
1741                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1742                 }
1743
1744                 /* Client cksum has been already copied to wire obdo in previous
1745                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1746                  * resent due to cksum error, this will allow Server to
1747                  * check+dump pages on its side */
1748         }
1749         ptlrpc_request_set_replen(req);
1750
1751         aa = ptlrpc_req_async_args(aa, req);
1752         aa->aa_oa = oa;
1753         aa->aa_requested_nob = requested_nob;
1754         aa->aa_nio_count = niocount;
1755         aa->aa_page_count = page_count;
1756         aa->aa_resends = 0;
1757         aa->aa_ppga = pga;
1758         aa->aa_cli = cli;
1759         INIT_LIST_HEAD(&aa->aa_oaps);
1760
1761         *reqp = req;
1762         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1763         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1764                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1765                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1766         RETURN(0);
1767
1768  out:
1769         ptlrpc_req_finished(req);
1770         RETURN(rc);
1771 }
1772
1773 char dbgcksum_file_name[PATH_MAX];
1774
1775 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1776                                 struct brw_page **pga, __u32 server_cksum,
1777                                 __u32 client_cksum)
1778 {
1779         struct file *filp;
1780         int rc, i;
1781         unsigned int len;
1782         char *buf;
1783
1784         /* will only keep dump of pages on first error for the same range in
1785          * file/fid, not during the resends/retries. */
1786         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1787                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1788                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1789                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1790                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1791                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1792                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1793                  pga[0]->off,
1794                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1795                  client_cksum, server_cksum);
1796         filp = filp_open(dbgcksum_file_name,
1797                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1798         if (IS_ERR(filp)) {
1799                 rc = PTR_ERR(filp);
1800                 if (rc == -EEXIST)
1801                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1802                                "checksum error: rc = %d\n", dbgcksum_file_name,
1803                                rc);
1804                 else
1805                         CERROR("%s: can't open to dump pages with checksum "
1806                                "error: rc = %d\n", dbgcksum_file_name, rc);
1807                 return;
1808         }
1809
1810         for (i = 0; i < page_count; i++) {
1811                 len = pga[i]->count;
1812                 buf = kmap(pga[i]->pg);
1813                 while (len != 0) {
1814                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1815                         if (rc < 0) {
1816                                 CERROR("%s: wanted to write %u but got %d "
1817                                        "error\n", dbgcksum_file_name, len, rc);
1818                                 break;
1819                         }
1820                         len -= rc;
1821                         buf += rc;
1822                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1823                                dbgcksum_file_name, rc);
1824                 }
1825                 kunmap(pga[i]->pg);
1826         }
1827
1828         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1829         if (rc)
1830                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1831         filp_close(filp, NULL);
1832 }
1833
1834 static int
1835 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1836                      __u32 client_cksum, __u32 server_cksum,
1837                      struct osc_brw_async_args *aa)
1838 {
1839         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1840         enum cksum_types cksum_type;
1841         obd_dif_csum_fn *fn = NULL;
1842         int sector_size = 0;
1843         __u32 new_cksum;
1844         char *msg;
1845         int rc;
1846
1847         if (server_cksum == client_cksum) {
1848                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1849                 return 0;
1850         }
1851
1852         if (aa->aa_cli->cl_checksum_dump)
1853                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1854                                     server_cksum, client_cksum);
1855
1856         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1857                                            oa->o_flags : 0);
1858
1859         switch (cksum_type) {
1860         case OBD_CKSUM_T10IP512:
1861                 fn = obd_dif_ip_fn;
1862                 sector_size = 512;
1863                 break;
1864         case OBD_CKSUM_T10IP4K:
1865                 fn = obd_dif_ip_fn;
1866                 sector_size = 4096;
1867                 break;
1868         case OBD_CKSUM_T10CRC512:
1869                 fn = obd_dif_crc_fn;
1870                 sector_size = 512;
1871                 break;
1872         case OBD_CKSUM_T10CRC4K:
1873                 fn = obd_dif_crc_fn;
1874                 sector_size = 4096;
1875                 break;
1876         default:
1877                 break;
1878         }
1879
1880         if (fn)
1881                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1882                                              aa->aa_page_count, aa->aa_ppga,
1883                                              OST_WRITE, fn, sector_size,
1884                                              &new_cksum);
1885         else
1886                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1887                                        aa->aa_ppga, OST_WRITE, cksum_type,
1888                                        &new_cksum);
1889
1890         if (rc < 0)
1891                 msg = "failed to calculate the client write checksum";
1892         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1893                 msg = "the server did not use the checksum type specified in "
1894                       "the original request - likely a protocol problem";
1895         else if (new_cksum == server_cksum)
1896                 msg = "changed on the client after we checksummed it - "
1897                       "likely false positive due to mmap IO (bug 11742)";
1898         else if (new_cksum == client_cksum)
1899                 msg = "changed in transit before arrival at OST";
1900         else
1901                 msg = "changed in transit AND doesn't match the original - "
1902                       "likely false positive due to mmap IO (bug 11742)";
1903
1904         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1905                            DFID " object "DOSTID" extent [%llu-%llu], original "
1906                            "client csum %x (type %x), server csum %x (type %x),"
1907                            " client csum now %x\n",
1908                            obd_name, msg, libcfs_nid2str(peer->nid),
1909                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1910                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1911                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1912                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1913                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1914                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1915                            client_cksum,
1916                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1917                            server_cksum, cksum_type, new_cksum);
1918         return 1;
1919 }
1920
1921 /* Note rc enters this function as number of bytes transferred */
1922 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1923 {
1924         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1925         struct client_obd *cli = aa->aa_cli;
1926         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1927         const struct lnet_process_id *peer =
1928                 &req->rq_import->imp_connection->c_peer;
1929         struct ost_body *body;
1930         u32 client_cksum = 0;
1931         struct inode *inode;
1932         unsigned int blockbits = 0, blocksize = 0;
1933
1934         ENTRY;
1935
1936         if (rc < 0 && rc != -EDQUOT) {
1937                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1938                 RETURN(rc);
1939         }
1940
1941         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1942         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1943         if (body == NULL) {
1944                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1945                 RETURN(-EPROTO);
1946         }
1947
1948         /* set/clear over quota flag for a uid/gid/projid */
1949         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1950             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1951                 unsigned qid[LL_MAXQUOTAS] = {
1952                                          body->oa.o_uid, body->oa.o_gid,
1953                                          body->oa.o_projid };
1954                 CDEBUG(D_QUOTA,
1955                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1956                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1957                        body->oa.o_valid, body->oa.o_flags);
1958                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1959                                        body->oa.o_flags);
1960         }
1961
1962         osc_update_grant(cli, body);
1963
1964         if (rc < 0)
1965                 RETURN(rc);
1966
1967         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1968                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1969
1970         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1971                 if (rc > 0) {
1972                         CERROR("%s: unexpected positive size %d\n",
1973                                obd_name, rc);
1974                         RETURN(-EPROTO);
1975                 }
1976
1977                 if (req->rq_bulk != NULL &&
1978                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1979                         RETURN(-EAGAIN);
1980
1981                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1982                     check_write_checksum(&body->oa, peer, client_cksum,
1983                                          body->oa.o_cksum, aa))
1984                         RETURN(-EAGAIN);
1985
1986                 rc = check_write_rcs(req, aa->aa_requested_nob,
1987                                      aa->aa_nio_count, aa->aa_page_count,
1988                                      aa->aa_ppga);
1989                 GOTO(out, rc);
1990         }
1991
1992         /* The rest of this function executes only for OST_READs */
1993
1994         if (req->rq_bulk == NULL) {
1995                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1996                                           RCL_SERVER);
1997                 LASSERT(rc == req->rq_status);
1998         } else {
1999                 /* if unwrap_bulk failed, return -EAGAIN to retry */
2000                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2001         }
2002         if (rc < 0)
2003                 GOTO(out, rc = -EAGAIN);
2004
2005         if (rc > aa->aa_requested_nob) {
2006                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2007                        rc, aa->aa_requested_nob);
2008                 RETURN(-EPROTO);
2009         }
2010
2011         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2012                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2013                        rc, req->rq_bulk->bd_nob_transferred);
2014                 RETURN(-EPROTO);
2015         }
2016
2017         if (req->rq_bulk == NULL) {
2018                 /* short io */
2019                 int nob, pg_count, i = 0;
2020                 unsigned char *buf;
2021
2022                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2023                 pg_count = aa->aa_page_count;
2024                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2025                                                    rc);
2026                 nob = rc;
2027                 while (nob > 0 && pg_count > 0) {
2028                         unsigned char *ptr;
2029                         int count = aa->aa_ppga[i]->count > nob ?
2030                                     nob : aa->aa_ppga[i]->count;
2031
2032                         CDEBUG(D_CACHE, "page %p count %d\n",
2033                                aa->aa_ppga[i]->pg, count);
2034                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2035                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2036                                count);
2037                         kunmap_atomic((void *) ptr);
2038
2039                         buf += count;
2040                         nob -= count;
2041                         i++;
2042                         pg_count--;
2043                 }
2044         }
2045
2046         if (rc < aa->aa_requested_nob)
2047                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2048
2049         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2050                 static int cksum_counter;
2051                 u32        server_cksum = body->oa.o_cksum;
2052                 char      *via = "";
2053                 char      *router = "";
2054                 enum cksum_types cksum_type;
2055                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2056                         body->oa.o_flags : 0;
2057
2058                 cksum_type = obd_cksum_type_unpack(o_flags);
2059                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2060                                           aa->aa_page_count, aa->aa_ppga,
2061                                           OST_READ, &client_cksum);
2062                 if (rc < 0)
2063                         GOTO(out, rc);
2064
2065                 if (req->rq_bulk != NULL &&
2066                     peer->nid != req->rq_bulk->bd_sender) {
2067                         via = " via ";
2068                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2069                 }
2070
2071                 if (server_cksum != client_cksum) {
2072                         struct ost_body *clbody;
2073                         u32 page_count = aa->aa_page_count;
2074
2075                         clbody = req_capsule_client_get(&req->rq_pill,
2076                                                         &RMF_OST_BODY);
2077                         if (cli->cl_checksum_dump)
2078                                 dump_all_bulk_pages(&clbody->oa, page_count,
2079                                                     aa->aa_ppga, server_cksum,
2080                                                     client_cksum);
2081
2082                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2083                                            "%s%s%s inode "DFID" object "DOSTID
2084                                            " extent [%llu-%llu], client %x, "
2085                                            "server %x, cksum_type %x\n",
2086                                            obd_name,
2087                                            libcfs_nid2str(peer->nid),
2088                                            via, router,
2089                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2090                                                 clbody->oa.o_parent_seq : 0ULL,
2091                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2092                                                 clbody->oa.o_parent_oid : 0,
2093                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2094                                                 clbody->oa.o_parent_ver : 0,
2095                                            POSTID(&body->oa.o_oi),
2096                                            aa->aa_ppga[0]->off,
2097                                            aa->aa_ppga[page_count-1]->off +
2098                                            aa->aa_ppga[page_count-1]->count - 1,
2099                                            client_cksum, server_cksum,
2100                                            cksum_type);
2101                         cksum_counter = 0;
2102                         aa->aa_oa->o_cksum = client_cksum;
2103                         rc = -EAGAIN;
2104                 } else {
2105                         cksum_counter++;
2106                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2107                         rc = 0;
2108                 }
2109         } else if (unlikely(client_cksum)) {
2110                 static int cksum_missed;
2111
2112                 cksum_missed++;
2113                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2114                         CERROR("%s: checksum %u requested from %s but not sent\n",
2115                                obd_name, cksum_missed,
2116                                libcfs_nid2str(peer->nid));
2117         } else {
2118                 rc = 0;
2119         }
2120
2121         inode = page2inode(aa->aa_ppga[0]->pg);
2122         if (inode == NULL) {
2123                 /* Try to get reference to inode from cl_page if we are
2124                  * dealing with direct IO, as handled pages are not
2125                  * actual page cache pages.
2126                  */
2127                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2128
2129                 inode = oap2cl_page(oap)->cp_inode;
2130                 if (inode) {
2131                         blockbits = inode->i_blkbits;
2132                         blocksize = 1 << blockbits;
2133                 }
2134         }
2135         if (inode && IS_ENCRYPTED(inode)) {
2136                 int idx;
2137
2138                 if (!llcrypt_has_encryption_key(inode)) {
2139                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2140                         GOTO(out, rc);
2141                 }
2142                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2143                         struct brw_page *pg = aa->aa_ppga[idx];
2144                         unsigned int offs = 0;
2145
2146                         while (offs < PAGE_SIZE) {
2147                                 /* do not decrypt if page is all 0s */
2148                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2149                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2150                                         /* if page is empty forward info to
2151                                          * upper layers (ll_io_zero_page) by
2152                                          * clearing PagePrivate2
2153                                          */
2154                                         if (!offs)
2155                                                 ClearPagePrivate2(pg->pg);
2156                                         break;
2157                                 }
2158
2159                                 if (blockbits) {
2160                                         /* This is direct IO case. Directly call
2161                                          * decrypt function that takes inode as
2162                                          * input parameter. Page does not need
2163                                          * to be locked.
2164                                          */
2165                                         u64 lblk_num =
2166                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2167                                                      (PAGE_SHIFT - blockbits)) +
2168                                                        (offs >> blockbits);
2169                                         unsigned int i;
2170
2171                                         for (i = offs;
2172                                              i < offs +
2173                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2174                                              i += blocksize, lblk_num++) {
2175                                                 rc =
2176                                                   llcrypt_decrypt_block_inplace(
2177                                                           inode, pg->pg,
2178                                                           blocksize, i,
2179                                                           lblk_num);
2180                                                 if (rc)
2181                                                         break;
2182                                         }
2183                                 } else {
2184                                         rc = llcrypt_decrypt_pagecache_blocks(
2185                                                 pg->pg,
2186                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2187                                                 offs);
2188                                 }
2189                                 if (rc)
2190                                         GOTO(out, rc);
2191
2192                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2193                         }
2194                 }
2195         }
2196
2197 out:
2198         if (rc >= 0)
2199                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2200                                      aa->aa_oa, &body->oa);
2201
2202         RETURN(rc);
2203 }
2204
2205 static int osc_brw_redo_request(struct ptlrpc_request *request,
2206                                 struct osc_brw_async_args *aa, int rc)
2207 {
2208         struct ptlrpc_request *new_req;
2209         struct osc_brw_async_args *new_aa;
2210         struct osc_async_page *oap;
2211         ENTRY;
2212
2213         /* The below message is checked in replay-ost-single.sh test_8ae*/
2214         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2215                   "redo for recoverable error %d", rc);
2216
2217         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2218                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2219                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2220                                   aa->aa_ppga, &new_req, 1);
2221         if (rc)
2222                 RETURN(rc);
2223
2224         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2225                 if (oap->oap_request != NULL) {
2226                         LASSERTF(request == oap->oap_request,
2227                                  "request %p != oap_request %p\n",
2228                                  request, oap->oap_request);
2229                 }
2230         }
2231         /*
2232          * New request takes over pga and oaps from old request.
2233          * Note that copying a list_head doesn't work, need to move it...
2234          */
2235         aa->aa_resends++;
2236         new_req->rq_interpret_reply = request->rq_interpret_reply;
2237         new_req->rq_async_args = request->rq_async_args;
2238         new_req->rq_commit_cb = request->rq_commit_cb;
2239         /* cap resend delay to the current request timeout, this is similar to
2240          * what ptlrpc does (see after_reply()) */
2241         if (aa->aa_resends > new_req->rq_timeout)
2242                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2243         else
2244                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2245         new_req->rq_generation_set = 1;
2246         new_req->rq_import_generation = request->rq_import_generation;
2247
2248         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2249
2250         INIT_LIST_HEAD(&new_aa->aa_oaps);
2251         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2252         INIT_LIST_HEAD(&new_aa->aa_exts);
2253         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2254         new_aa->aa_resends = aa->aa_resends;
2255
2256         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2257                 if (oap->oap_request) {
2258                         ptlrpc_req_finished(oap->oap_request);
2259                         oap->oap_request = ptlrpc_request_addref(new_req);
2260                 }
2261         }
2262
2263         /* XXX: This code will run into problem if we're going to support
2264          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2265          * and wait for all of them to be finished. We should inherit request
2266          * set from old request. */
2267         ptlrpcd_add_req(new_req);
2268
2269         DEBUG_REQ(D_INFO, new_req, "new request");
2270         RETURN(0);
2271 }
2272
2273 /*
2274  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2275  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2276  * fine for our small page arrays and doesn't require allocation.  its an
2277  * insertion sort that swaps elements that are strides apart, shrinking the
2278  * stride down until its '1' and the array is sorted.
2279  */
2280 static void sort_brw_pages(struct brw_page **array, int num)
2281 {
2282         int stride, i, j;
2283         struct brw_page *tmp;
2284
2285         if (num == 1)
2286                 return;
2287         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2288                 ;
2289
2290         do {
2291                 stride /= 3;
2292                 for (i = stride ; i < num ; i++) {
2293                         tmp = array[i];
2294                         j = i;
2295                         while (j >= stride && array[j - stride]->off > tmp->off) {
2296                                 array[j] = array[j - stride];
2297                                 j -= stride;
2298                         }
2299                         array[j] = tmp;
2300                 }
2301         } while (stride > 1);
2302 }
2303
2304 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2305 {
2306         LASSERT(ppga != NULL);
2307         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2308 }
2309
2310 static int brw_interpret(const struct lu_env *env,
2311                          struct ptlrpc_request *req, void *args, int rc)
2312 {
2313         struct osc_brw_async_args *aa = args;
2314         struct osc_extent *ext;
2315         struct osc_extent *tmp;
2316         struct client_obd *cli = aa->aa_cli;
2317         unsigned long transferred = 0;
2318
2319         ENTRY;
2320
2321         rc = osc_brw_fini_request(req, rc);
2322         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2323
2324         /* restore clear text pages */
2325         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2326
2327         /*
2328          * When server returns -EINPROGRESS, client should always retry
2329          * regardless of the number of times the bulk was resent already.
2330          */
2331         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2332                 if (req->rq_import_generation !=
2333                     req->rq_import->imp_generation) {
2334                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2335                                ""DOSTID", rc = %d.\n",
2336                                req->rq_import->imp_obd->obd_name,
2337                                POSTID(&aa->aa_oa->o_oi), rc);
2338                 } else if (rc == -EINPROGRESS ||
2339                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2340                         rc = osc_brw_redo_request(req, aa, rc);
2341                 } else {
2342                         CERROR("%s: too many resent retries for object: "
2343                                "%llu:%llu, rc = %d.\n",
2344                                req->rq_import->imp_obd->obd_name,
2345                                POSTID(&aa->aa_oa->o_oi), rc);
2346                 }
2347
2348                 if (rc == 0)
2349                         RETURN(0);
2350                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2351                         rc = -EIO;
2352         }
2353
2354         if (rc == 0) {
2355                 struct obdo *oa = aa->aa_oa;
2356                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2357                 unsigned long valid = 0;
2358                 struct cl_object *obj;
2359                 struct osc_async_page *last;
2360
2361                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2362                 obj = osc2cl(last->oap_obj);
2363
2364                 cl_object_attr_lock(obj);
2365                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2366                         attr->cat_blocks = oa->o_blocks;
2367                         valid |= CAT_BLOCKS;
2368                 }
2369                 if (oa->o_valid & OBD_MD_FLMTIME) {
2370                         attr->cat_mtime = oa->o_mtime;
2371                         valid |= CAT_MTIME;
2372                 }
2373                 if (oa->o_valid & OBD_MD_FLATIME) {
2374                         attr->cat_atime = oa->o_atime;
2375                         valid |= CAT_ATIME;
2376                 }
2377                 if (oa->o_valid & OBD_MD_FLCTIME) {
2378                         attr->cat_ctime = oa->o_ctime;
2379                         valid |= CAT_CTIME;
2380                 }
2381
2382                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2383                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2384                         loff_t last_off = last->oap_count + last->oap_obj_off +
2385                                 last->oap_page_off;
2386
2387                         /* Change file size if this is an out of quota or
2388                          * direct IO write and it extends the file size */
2389                         if (loi->loi_lvb.lvb_size < last_off) {
2390                                 attr->cat_size = last_off;
2391                                 valid |= CAT_SIZE;
2392                         }
2393                         /* Extend KMS if it's not a lockless write */
2394                         if (loi->loi_kms < last_off &&
2395                             oap2osc_page(last)->ops_srvlock == 0) {
2396                                 attr->cat_kms = last_off;
2397                                 valid |= CAT_KMS;
2398                         }
2399                 }
2400
2401                 if (valid != 0)
2402                         cl_object_attr_update(env, obj, attr, valid);
2403                 cl_object_attr_unlock(obj);
2404         }
2405         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2406         aa->aa_oa = NULL;
2407
2408         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2409                 osc_inc_unstable_pages(req);
2410
2411         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2412                 list_del_init(&ext->oe_link);
2413                 osc_extent_finish(env, ext, 1,
2414                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2415         }
2416         LASSERT(list_empty(&aa->aa_exts));
2417         LASSERT(list_empty(&aa->aa_oaps));
2418
2419         transferred = (req->rq_bulk == NULL ? /* short io */
2420                        aa->aa_requested_nob :
2421                        req->rq_bulk->bd_nob_transferred);
2422
2423         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2424         ptlrpc_lprocfs_brw(req, transferred);
2425
2426         spin_lock(&cli->cl_loi_list_lock);
2427         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2428          * is called so we know whether to go to sync BRWs or wait for more
2429          * RPCs to complete */
2430         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2431                 cli->cl_w_in_flight--;
2432         else
2433                 cli->cl_r_in_flight--;
2434         osc_wake_cache_waiters(cli);
2435         spin_unlock(&cli->cl_loi_list_lock);
2436
2437         osc_io_unplug(env, cli, NULL);
2438         RETURN(rc);
2439 }
2440
2441 static void brw_commit(struct ptlrpc_request *req)
2442 {
2443         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2444          * this called via the rq_commit_cb, I need to ensure
2445          * osc_dec_unstable_pages is still called. Otherwise unstable
2446          * pages may be leaked. */
2447         spin_lock(&req->rq_lock);
2448         if (likely(req->rq_unstable)) {
2449                 req->rq_unstable = 0;
2450                 spin_unlock(&req->rq_lock);
2451
2452                 osc_dec_unstable_pages(req);
2453         } else {
2454                 req->rq_committed = 1;
2455                 spin_unlock(&req->rq_lock);
2456         }
2457 }
2458
2459 /**
2460  * Build an RPC by the list of extent @ext_list. The caller must ensure
2461  * that the total pages in this list are NOT over max pages per RPC.
2462  * Extents in the list must be in OES_RPC state.
2463  */
2464 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2465                   struct list_head *ext_list, int cmd)
2466 {
2467         struct ptlrpc_request           *req = NULL;
2468         struct osc_extent               *ext;
2469         struct brw_page                 **pga = NULL;
2470         struct osc_brw_async_args       *aa = NULL;
2471         struct obdo                     *oa = NULL;
2472         struct osc_async_page           *oap;
2473         struct osc_object               *obj = NULL;
2474         struct cl_req_attr              *crattr = NULL;
2475         loff_t                          starting_offset = OBD_OBJECT_EOF;
2476         loff_t                          ending_offset = 0;
2477         /* '1' for consistency with code that checks !mpflag to restore */
2478         int mpflag = 1;
2479         int                             mem_tight = 0;
2480         int                             page_count = 0;
2481         bool                            soft_sync = false;
2482         bool                            ndelay = false;
2483         int                             i;
2484         int                             grant = 0;
2485         int                             rc;
2486         __u32                           layout_version = 0;
2487         LIST_HEAD(rpc_list);
2488         struct ost_body                 *body;
2489         ENTRY;
2490         LASSERT(!list_empty(ext_list));
2491
2492         /* add pages into rpc_list to build BRW rpc */
2493         list_for_each_entry(ext, ext_list, oe_link) {
2494                 LASSERT(ext->oe_state == OES_RPC);
2495                 mem_tight |= ext->oe_memalloc;
2496                 grant += ext->oe_grants;
2497                 page_count += ext->oe_nr_pages;
2498                 layout_version = max(layout_version, ext->oe_layout_version);
2499                 if (obj == NULL)
2500                         obj = ext->oe_obj;
2501         }
2502
2503         soft_sync = osc_over_unstable_soft_limit(cli);
2504         if (mem_tight)
2505                 mpflag = memalloc_noreclaim_save();
2506
2507         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2508         if (pga == NULL)
2509                 GOTO(out, rc = -ENOMEM);
2510
2511         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2512         if (oa == NULL)
2513                 GOTO(out, rc = -ENOMEM);
2514
2515         i = 0;
2516         list_for_each_entry(ext, ext_list, oe_link) {
2517                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2518                         if (mem_tight)
2519                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2520                         if (soft_sync)
2521                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2522                         pga[i] = &oap->oap_brw_page;
2523                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2524                         i++;
2525
2526                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2527                         if (starting_offset == OBD_OBJECT_EOF ||
2528                             starting_offset > oap->oap_obj_off)
2529                                 starting_offset = oap->oap_obj_off;
2530                         else
2531                                 LASSERT(oap->oap_page_off == 0);
2532                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2533                                 ending_offset = oap->oap_obj_off +
2534                                                 oap->oap_count;
2535                         else
2536                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2537                                         PAGE_SIZE);
2538                 }
2539                 if (ext->oe_ndelay)
2540                         ndelay = true;
2541         }
2542
2543         /* first page in the list */
2544         oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2545
2546         crattr = &osc_env_info(env)->oti_req_attr;
2547         memset(crattr, 0, sizeof(*crattr));
2548         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2549         crattr->cra_flags = ~0ULL;
2550         crattr->cra_page = oap2cl_page(oap);
2551         crattr->cra_oa = oa;
2552         cl_req_attr_set(env, osc2cl(obj), crattr);
2553
2554         if (cmd == OBD_BRW_WRITE) {
2555                 oa->o_grant_used = grant;
2556                 if (layout_version > 0) {
2557                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2558                                PFID(&oa->o_oi.oi_fid), layout_version);
2559
2560                         oa->o_layout_version = layout_version;
2561                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2562                 }
2563         }
2564
2565         sort_brw_pages(pga, page_count);
2566         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2567         if (rc != 0) {
2568                 CERROR("prep_req failed: %d\n", rc);
2569                 GOTO(out, rc);
2570         }
2571
2572         req->rq_commit_cb = brw_commit;
2573         req->rq_interpret_reply = brw_interpret;
2574         req->rq_memalloc = mem_tight != 0;
2575         oap->oap_request = ptlrpc_request_addref(req);
2576         if (ndelay) {
2577                 req->rq_no_resend = req->rq_no_delay = 1;
2578                 /* probably set a shorter timeout value.
2579                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2580                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2581         }
2582
2583         /* Need to update the timestamps after the request is built in case
2584          * we race with setattr (locally or in queue at OST).  If OST gets
2585          * later setattr before earlier BRW (as determined by the request xid),
2586          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2587          * way to do this in a single call.  bug 10150 */
2588         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2589         crattr->cra_oa = &body->oa;
2590         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2591         cl_req_attr_set(env, osc2cl(obj), crattr);
2592         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2593
2594         aa = ptlrpc_req_async_args(aa, req);
2595         INIT_LIST_HEAD(&aa->aa_oaps);
2596         list_splice_init(&rpc_list, &aa->aa_oaps);
2597         INIT_LIST_HEAD(&aa->aa_exts);
2598         list_splice_init(ext_list, &aa->aa_exts);
2599
2600         spin_lock(&cli->cl_loi_list_lock);
2601         starting_offset >>= PAGE_SHIFT;
2602         if (cmd == OBD_BRW_READ) {
2603                 cli->cl_r_in_flight++;
2604                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2605                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2606                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2607                                       starting_offset + 1);
2608         } else {
2609                 cli->cl_w_in_flight++;
2610                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2611                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2612                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2613                                       starting_offset + 1);
2614         }
2615         spin_unlock(&cli->cl_loi_list_lock);
2616
2617         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2618                   page_count, aa, cli->cl_r_in_flight,
2619                   cli->cl_w_in_flight);
2620         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2621
2622         ptlrpcd_add_req(req);
2623         rc = 0;
2624         EXIT;
2625
2626 out:
2627         if (mem_tight)
2628                 memalloc_noreclaim_restore(mpflag);
2629
2630         if (rc != 0) {
2631                 LASSERT(req == NULL);
2632
2633                 if (oa)
2634                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2635                 if (pga) {
2636                         osc_release_bounce_pages(pga, page_count);
2637                         osc_release_ppga(pga, page_count);
2638                 }
2639                 /* this should happen rarely and is pretty bad, it makes the
2640                  * pending list not follow the dirty order
2641                  */
2642                 while ((ext = list_first_entry_or_null(ext_list,
2643                                                        struct osc_extent,
2644                                                        oe_link)) != NULL) {
2645                         list_del_init(&ext->oe_link);
2646                         osc_extent_finish(env, ext, 0, rc);
2647                 }
2648         }
2649         RETURN(rc);
2650 }
2651
2652 /* This is to refresh our lock in face of no RPCs. */
2653 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2654 {
2655         struct ptlrpc_request *req;
2656         struct obdo oa;
2657         struct brw_page bpg = { .off = start, .count = 1};
2658         struct brw_page *pga = &bpg;
2659         int rc;
2660
2661         memset(&oa, 0, sizeof(oa));
2662         oa.o_oi = osc->oo_oinfo->loi_oi;
2663         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2664         /* For updated servers - don't do a read */
2665         oa.o_flags = OBD_FL_NORPC;
2666
2667         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2668                                   &req, 0);
2669
2670         /* If we succeeded we ship it off, if not there's no point in doing
2671          * anything. Also no resends.
2672          * No interpret callback, no commit callback.
2673          */
2674         if (!rc) {
2675                 req->rq_no_resend = 1;
2676                 ptlrpcd_add_req(req);
2677         }
2678 }
2679
2680 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2681 {
2682         int set = 0;
2683
2684         LASSERT(lock != NULL);
2685
2686         lock_res_and_lock(lock);
2687
2688         if (lock->l_ast_data == NULL)
2689                 lock->l_ast_data = data;
2690         if (lock->l_ast_data == data)
2691                 set = 1;
2692
2693         unlock_res_and_lock(lock);
2694
2695         return set;
2696 }
2697
2698 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2699                      void *cookie, struct lustre_handle *lockh,
2700                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2701                      int errcode)
2702 {
2703         bool intent = *flags & LDLM_FL_HAS_INTENT;
2704         int rc;
2705         ENTRY;
2706
2707         /* The request was created before ldlm_cli_enqueue call. */
2708         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2709                 struct ldlm_reply *rep;
2710
2711                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2712                 LASSERT(rep != NULL);
2713
2714                 rep->lock_policy_res1 =
2715                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2716                 if (rep->lock_policy_res1)
2717                         errcode = rep->lock_policy_res1;
2718                 if (!speculative)
2719                         *flags |= LDLM_FL_LVB_READY;
2720         } else if (errcode == ELDLM_OK) {
2721                 *flags |= LDLM_FL_LVB_READY;
2722         }
2723
2724         /* Call the update callback. */
2725         rc = (*upcall)(cookie, lockh, errcode);
2726
2727         /* release the reference taken in ldlm_cli_enqueue() */
2728         if (errcode == ELDLM_LOCK_MATCHED)
2729                 errcode = ELDLM_OK;
2730         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2731                 ldlm_lock_decref(lockh, mode);
2732
2733         RETURN(rc);
2734 }
2735
2736 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2737                           void *args, int rc)
2738 {
2739         struct osc_enqueue_args *aa = args;
2740         struct ldlm_lock *lock;
2741         struct lustre_handle *lockh = &aa->oa_lockh;
2742         enum ldlm_mode mode = aa->oa_mode;
2743         struct ost_lvb *lvb = aa->oa_lvb;
2744         __u32 lvb_len = sizeof(*lvb);
2745         __u64 flags = 0;
2746         struct ldlm_enqueue_info einfo = {
2747                 .ei_type = aa->oa_type,
2748                 .ei_mode = mode,
2749         };
2750
2751         ENTRY;
2752
2753         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2754          * be valid. */
2755         lock = ldlm_handle2lock(lockh);
2756         LASSERTF(lock != NULL,
2757                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2758                  lockh->cookie, req, aa);
2759
2760         /* Take an additional reference so that a blocking AST that
2761          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2762          * to arrive after an upcall has been executed by
2763          * osc_enqueue_fini(). */
2764         ldlm_lock_addref(lockh, mode);
2765
2766         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2767         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2768
2769         /* Let CP AST to grant the lock first. */
2770         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2771
2772         if (aa->oa_speculative) {
2773                 LASSERT(aa->oa_lvb == NULL);
2774                 LASSERT(aa->oa_flags == NULL);
2775                 aa->oa_flags = &flags;
2776         }
2777
2778         /* Complete obtaining the lock procedure. */
2779         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2780                                    lvb, lvb_len, lockh, rc);
2781         /* Complete osc stuff. */
2782         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2783                               aa->oa_flags, aa->oa_speculative, rc);
2784
2785         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2786
2787         ldlm_lock_decref(lockh, mode);
2788         LDLM_LOCK_PUT(lock);
2789         RETURN(rc);
2790 }
2791
2792 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2793  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2794  * other synchronous requests, however keeping some locks and trying to obtain
2795  * others may take a considerable amount of time in a case of ost failure; and
2796  * when other sync requests do not get released lock from a client, the client
2797  * is evicted from the cluster -- such scenarious make the life difficult, so
2798  * release locks just after they are obtained. */
2799 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2800                      __u64 *flags, union ldlm_policy_data *policy,
2801                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2802                      void *cookie, struct ldlm_enqueue_info *einfo,
2803                      struct ptlrpc_request_set *rqset, int async,
2804                      bool speculative)
2805 {
2806         struct obd_device *obd = exp->exp_obd;
2807         struct lustre_handle lockh = { 0 };
2808         struct ptlrpc_request *req = NULL;
2809         int intent = *flags & LDLM_FL_HAS_INTENT;
2810         __u64 match_flags = *flags;
2811         enum ldlm_mode mode;
2812         int rc;
2813         ENTRY;
2814
2815         /* Filesystem lock extents are extended to page boundaries so that
2816          * dealing with the page cache is a little smoother.  */
2817         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2818         policy->l_extent.end |= ~PAGE_MASK;
2819
2820         /* Next, search for already existing extent locks that will cover us */
2821         /* If we're trying to read, we also search for an existing PW lock.  The
2822          * VFS and page cache already protect us locally, so lots of readers/
2823          * writers can share a single PW lock.
2824          *
2825          * There are problems with conversion deadlocks, so instead of
2826          * converting a read lock to a write lock, we'll just enqueue a new
2827          * one.
2828          *
2829          * At some point we should cancel the read lock instead of making them
2830          * send us a blocking callback, but there are problems with canceling
2831          * locks out from other users right now, too. */
2832         mode = einfo->ei_mode;
2833         if (einfo->ei_mode == LCK_PR)
2834                 mode |= LCK_PW;
2835         /* Normal lock requests must wait for the LVB to be ready before
2836          * matching a lock; speculative lock requests do not need to,
2837          * because they will not actually use the lock. */
2838         if (!speculative)
2839                 match_flags |= LDLM_FL_LVB_READY;
2840         if (intent != 0)
2841                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2842         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2843                                einfo->ei_type, policy, mode, &lockh);
2844         if (mode) {
2845                 struct ldlm_lock *matched;
2846
2847                 if (*flags & LDLM_FL_TEST_LOCK)
2848                         RETURN(ELDLM_OK);
2849
2850                 matched = ldlm_handle2lock(&lockh);
2851                 if (speculative) {
2852                         /* This DLM lock request is speculative, and does not
2853                          * have an associated IO request. Therefore if there
2854                          * is already a DLM lock, it wll just inform the
2855                          * caller to cancel the request for this stripe.*/
2856                         lock_res_and_lock(matched);
2857                         if (ldlm_extent_equal(&policy->l_extent,
2858                             &matched->l_policy_data.l_extent))
2859                                 rc = -EEXIST;
2860                         else
2861                                 rc = -ECANCELED;
2862                         unlock_res_and_lock(matched);
2863
2864                         ldlm_lock_decref(&lockh, mode);
2865                         LDLM_LOCK_PUT(matched);
2866                         RETURN(rc);
2867                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2868                         *flags |= LDLM_FL_LVB_READY;
2869
2870                         /* We already have a lock, and it's referenced. */
2871                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2872
2873                         ldlm_lock_decref(&lockh, mode);
2874                         LDLM_LOCK_PUT(matched);
2875                         RETURN(ELDLM_OK);
2876                 } else {
2877                         ldlm_lock_decref(&lockh, mode);
2878                         LDLM_LOCK_PUT(matched);
2879                 }
2880         }
2881
2882         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2883                 RETURN(-ENOLCK);
2884
2885         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2886         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2887
2888         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2889                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2890         if (async) {
2891                 if (!rc) {
2892                         struct osc_enqueue_args *aa;
2893                         aa = ptlrpc_req_async_args(aa, req);
2894                         aa->oa_exp         = exp;
2895                         aa->oa_mode        = einfo->ei_mode;
2896                         aa->oa_type        = einfo->ei_type;
2897                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2898                         aa->oa_upcall      = upcall;
2899                         aa->oa_cookie      = cookie;
2900                         aa->oa_speculative = speculative;
2901                         if (!speculative) {
2902                                 aa->oa_flags  = flags;
2903                                 aa->oa_lvb    = lvb;
2904                         } else {
2905                                 /* speculative locks are essentially to enqueue
2906                                  * a DLM lock  in advance, so we don't care
2907                                  * about the result of the enqueue. */
2908                                 aa->oa_lvb    = NULL;
2909                                 aa->oa_flags  = NULL;
2910                         }
2911
2912                         req->rq_interpret_reply = osc_enqueue_interpret;
2913                         ptlrpc_set_add_req(rqset, req);
2914                 }
2915                 RETURN(rc);
2916         }
2917
2918         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2919                               flags, speculative, rc);
2920
2921         RETURN(rc);
2922 }
2923
2924 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2925                    struct ldlm_res_id *res_id, enum ldlm_type type,
2926                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2927                    __u64 *flags, struct osc_object *obj,
2928                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2929 {
2930         struct obd_device *obd = exp->exp_obd;
2931         __u64 lflags = *flags;
2932         enum ldlm_mode rc;
2933         ENTRY;
2934
2935         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2936                 RETURN(-EIO);
2937
2938         /* Filesystem lock extents are extended to page boundaries so that
2939          * dealing with the page cache is a little smoother */
2940         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2941         policy->l_extent.end |= ~PAGE_MASK;
2942
2943         /* Next, search for already existing extent locks that will cover us */
2944         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2945                                         res_id, type, policy, mode, lockh,
2946                                         match_flags);
2947         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2948                 RETURN(rc);
2949
2950         if (obj != NULL) {
2951                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2952
2953                 LASSERT(lock != NULL);
2954                 if (osc_set_lock_data(lock, obj)) {
2955                         lock_res_and_lock(lock);
2956                         if (!ldlm_is_lvb_cached(lock)) {
2957                                 LASSERT(lock->l_ast_data == obj);
2958                                 osc_lock_lvb_update(env, obj, lock, NULL);
2959                                 ldlm_set_lvb_cached(lock);
2960                         }
2961                         unlock_res_and_lock(lock);
2962                 } else {
2963                         ldlm_lock_decref(lockh, rc);
2964                         rc = 0;
2965                 }
2966                 LDLM_LOCK_PUT(lock);
2967         }
2968         RETURN(rc);
2969 }
2970
2971 static int osc_statfs_interpret(const struct lu_env *env,
2972                                 struct ptlrpc_request *req, void *args, int rc)
2973 {
2974         struct osc_async_args *aa = args;
2975         struct obd_statfs *msfs;
2976
2977         ENTRY;
2978         if (rc == -EBADR)
2979                 /*
2980                  * The request has in fact never been sent due to issues at
2981                  * a higher level (LOV).  Exit immediately since the caller
2982                  * is aware of the problem and takes care of the clean up.
2983                  */
2984                 RETURN(rc);
2985
2986         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2987             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2988                 GOTO(out, rc = 0);
2989
2990         if (rc != 0)
2991                 GOTO(out, rc);
2992
2993         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2994         if (msfs == NULL)
2995                 GOTO(out, rc = -EPROTO);
2996
2997         *aa->aa_oi->oi_osfs = *msfs;
2998 out:
2999         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3000
3001         RETURN(rc);
3002 }
3003
3004 static int osc_statfs_async(struct obd_export *exp,
3005                             struct obd_info *oinfo, time64_t max_age,
3006                             struct ptlrpc_request_set *rqset)
3007 {
3008         struct obd_device     *obd = class_exp2obd(exp);
3009         struct ptlrpc_request *req;
3010         struct osc_async_args *aa;
3011         int rc;
3012         ENTRY;
3013
3014         if (obd->obd_osfs_age >= max_age) {
3015                 CDEBUG(D_SUPER,
3016                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3017                        obd->obd_name, &obd->obd_osfs,
3018                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3019                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3020                 spin_lock(&obd->obd_osfs_lock);
3021                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3022                 spin_unlock(&obd->obd_osfs_lock);
3023                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3024                 if (oinfo->oi_cb_up)
3025                         oinfo->oi_cb_up(oinfo, 0);
3026
3027                 RETURN(0);
3028         }
3029
3030         /* We could possibly pass max_age in the request (as an absolute
3031          * timestamp or a "seconds.usec ago") so the target can avoid doing
3032          * extra calls into the filesystem if that isn't necessary (e.g.
3033          * during mount that would help a bit).  Having relative timestamps
3034          * is not so great if request processing is slow, while absolute
3035          * timestamps are not ideal because they need time synchronization. */
3036         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3037         if (req == NULL)
3038                 RETURN(-ENOMEM);
3039
3040         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3041         if (rc) {
3042                 ptlrpc_request_free(req);
3043                 RETURN(rc);
3044         }
3045         ptlrpc_request_set_replen(req);
3046         req->rq_request_portal = OST_CREATE_PORTAL;
3047         ptlrpc_at_set_req_timeout(req);
3048
3049         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3050                 /* procfs requests not want stat in wait for avoid deadlock */
3051                 req->rq_no_resend = 1;
3052                 req->rq_no_delay = 1;
3053         }
3054
3055         req->rq_interpret_reply = osc_statfs_interpret;
3056         aa = ptlrpc_req_async_args(aa, req);
3057         aa->aa_oi = oinfo;
3058
3059         ptlrpc_set_add_req(rqset, req);
3060         RETURN(0);
3061 }
3062
3063 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3064                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3065 {
3066         struct obd_device     *obd = class_exp2obd(exp);
3067         struct obd_statfs     *msfs;
3068         struct ptlrpc_request *req;
3069         struct obd_import     *imp, *imp0;
3070         int rc;
3071         ENTRY;
3072
3073         /*Since the request might also come from lprocfs, so we need
3074          *sync this with client_disconnect_export Bug15684
3075          */
3076         with_imp_locked(obd, imp0, rc)
3077                 imp = class_import_get(imp0);
3078         if (rc)
3079                 RETURN(rc);
3080
3081         /* We could possibly pass max_age in the request (as an absolute
3082          * timestamp or a "seconds.usec ago") so the target can avoid doing
3083          * extra calls into the filesystem if that isn't necessary (e.g.
3084          * during mount that would help a bit).  Having relative timestamps
3085          * is not so great if request processing is slow, while absolute
3086          * timestamps are not ideal because they need time synchronization. */
3087         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3088
3089         class_import_put(imp);
3090
3091         if (req == NULL)
3092                 RETURN(-ENOMEM);
3093
3094         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3095         if (rc) {
3096                 ptlrpc_request_free(req);
3097                 RETURN(rc);
3098         }
3099         ptlrpc_request_set_replen(req);
3100         req->rq_request_portal = OST_CREATE_PORTAL;
3101         ptlrpc_at_set_req_timeout(req);
3102
3103         if (flags & OBD_STATFS_NODELAY) {
3104                 /* procfs requests not want stat in wait for avoid deadlock */
3105                 req->rq_no_resend = 1;
3106                 req->rq_no_delay = 1;
3107         }
3108
3109         rc = ptlrpc_queue_wait(req);
3110         if (rc)
3111                 GOTO(out, rc);
3112
3113         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3114         if (msfs == NULL)
3115                 GOTO(out, rc = -EPROTO);
3116
3117         *osfs = *msfs;
3118
3119         EXIT;
3120 out:
3121         ptlrpc_req_finished(req);
3122         return rc;
3123 }
3124
3125 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3126                          void *karg, void __user *uarg)
3127 {
3128         struct obd_device *obd = exp->exp_obd;
3129         struct obd_ioctl_data *data = karg;
3130         int rc = 0;
3131
3132         ENTRY;
3133         if (!try_module_get(THIS_MODULE)) {
3134                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3135                        module_name(THIS_MODULE));
3136                 return -EINVAL;
3137         }
3138         switch (cmd) {
3139         case OBD_IOC_CLIENT_RECOVER:
3140                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3141                                            data->ioc_inlbuf1, 0);
3142                 if (rc > 0)
3143                         rc = 0;
3144                 break;
3145         case IOC_OSC_SET_ACTIVE:
3146                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3147                                               data->ioc_offset);
3148                 break;
3149         default:
3150                 rc = -ENOTTY;
3151                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3152                        obd->obd_name, cmd, current->comm, rc);
3153                 break;
3154         }
3155
3156         module_put(THIS_MODULE);
3157         return rc;
3158 }
3159
3160 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3161                        u32 keylen, void *key, u32 vallen, void *val,
3162                        struct ptlrpc_request_set *set)
3163 {
3164         struct ptlrpc_request *req;
3165         struct obd_device     *obd = exp->exp_obd;
3166         struct obd_import     *imp = class_exp2cliimp(exp);
3167         char                  *tmp;
3168         int                    rc;
3169         ENTRY;
3170
3171         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3172
3173         if (KEY_IS(KEY_CHECKSUM)) {
3174                 if (vallen != sizeof(int))
3175                         RETURN(-EINVAL);
3176                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3177                 RETURN(0);
3178         }
3179
3180         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3181                 sptlrpc_conf_client_adapt(obd);
3182                 RETURN(0);
3183         }
3184
3185         if (KEY_IS(KEY_FLUSH_CTX)) {
3186                 sptlrpc_import_flush_my_ctx(imp);
3187                 RETURN(0);
3188         }
3189
3190         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3191                 struct client_obd *cli = &obd->u.cli;
3192                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3193                 long target = *(long *)val;
3194
3195                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3196                 *(long *)val -= nr;
3197                 RETURN(0);
3198         }
3199
3200         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3201                 RETURN(-EINVAL);
3202
3203         /* We pass all other commands directly to OST. Since nobody calls osc
3204            methods directly and everybody is supposed to go through LOV, we
3205            assume lov checked invalid values for us.
3206            The only recognised values so far are evict_by_nid and mds_conn.
3207            Even if something bad goes through, we'd get a -EINVAL from OST
3208            anyway. */
3209
3210         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3211                                                 &RQF_OST_SET_GRANT_INFO :
3212                                                 &RQF_OBD_SET_INFO);
3213         if (req == NULL)
3214                 RETURN(-ENOMEM);
3215
3216         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3217                              RCL_CLIENT, keylen);
3218         if (!KEY_IS(KEY_GRANT_SHRINK))
3219                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3220                                      RCL_CLIENT, vallen);
3221         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3222         if (rc) {
3223                 ptlrpc_request_free(req);
3224                 RETURN(rc);
3225         }
3226
3227         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3228         memcpy(tmp, key, keylen);
3229         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3230                                                         &RMF_OST_BODY :
3231                                                         &RMF_SETINFO_VAL);
3232         memcpy(tmp, val, vallen);
3233
3234         if (KEY_IS(KEY_GRANT_SHRINK)) {
3235                 struct osc_grant_args *aa;
3236                 struct obdo *oa;
3237
3238                 aa = ptlrpc_req_async_args(aa, req);
3239                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3240                 if (!oa) {
3241                         ptlrpc_req_finished(req);
3242                         RETURN(-ENOMEM);
3243                 }
3244                 *oa = ((struct ost_body *)val)->oa;
3245                 aa->aa_oa = oa;
3246                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3247         }
3248
3249         ptlrpc_request_set_replen(req);
3250         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3251                 LASSERT(set != NULL);
3252                 ptlrpc_set_add_req(set, req);
3253                 ptlrpc_check_set(NULL, set);
3254         } else {
3255                 ptlrpcd_add_req(req);
3256         }
3257
3258         RETURN(0);
3259 }
3260 EXPORT_SYMBOL(osc_set_info_async);
3261
3262 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3263                   struct obd_device *obd, struct obd_uuid *cluuid,
3264                   struct obd_connect_data *data, void *localdata)
3265 {
3266         struct client_obd *cli = &obd->u.cli;
3267
3268         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3269                 long lost_grant;
3270                 long grant;
3271
3272                 spin_lock(&cli->cl_loi_list_lock);
3273                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3274                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3275                         /* restore ocd_grant_blkbits as client page bits */
3276                         data->ocd_grant_blkbits = PAGE_SHIFT;
3277                         grant += cli->cl_dirty_grant;
3278                 } else {
3279                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3280                 }
3281                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3282                 lost_grant = cli->cl_lost_grant;
3283                 cli->cl_lost_grant = 0;
3284                 spin_unlock(&cli->cl_loi_list_lock);
3285
3286                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3287                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3288                        data->ocd_version, data->ocd_grant, lost_grant);
3289         }
3290
3291         RETURN(0);
3292 }
3293 EXPORT_SYMBOL(osc_reconnect);
3294
3295 int osc_disconnect(struct obd_export *exp)
3296 {
3297         struct obd_device *obd = class_exp2obd(exp);
3298         int rc;
3299
3300         rc = client_disconnect_export(exp);
3301         /**
3302          * Initially we put del_shrink_grant before disconnect_export, but it
3303          * causes the following problem if setup (connect) and cleanup
3304          * (disconnect) are tangled together.
3305          *      connect p1                     disconnect p2
3306          *   ptlrpc_connect_import
3307          *     ...............               class_manual_cleanup
3308          *                                     osc_disconnect
3309          *                                     del_shrink_grant
3310          *   ptlrpc_connect_interrupt
3311          *     osc_init_grant
3312          *   add this client to shrink list
3313          *                                      cleanup_osc
3314          * Bang! grant shrink thread trigger the shrink. BUG18662
3315          */
3316         osc_del_grant_list(&obd->u.cli);
3317         return rc;
3318 }
3319 EXPORT_SYMBOL(osc_disconnect);
3320
3321 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3322                                  struct hlist_node *hnode, void *arg)
3323 {
3324         struct lu_env *env = arg;
3325         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3326         struct ldlm_lock *lock;
3327         struct osc_object *osc = NULL;
3328         ENTRY;
3329
3330         lock_res(res);
3331         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3332                 if (lock->l_ast_data != NULL && osc == NULL) {
3333                         osc = lock->l_ast_data;
3334                         cl_object_get(osc2cl(osc));
3335                 }
3336
3337                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3338                  * by the 2nd round of ldlm_namespace_clean() call in
3339                  * osc_import_event(). */
3340                 ldlm_clear_cleaned(lock);
3341         }
3342         unlock_res(res);
3343
3344         if (osc != NULL) {
3345                 osc_object_invalidate(env, osc);
3346                 cl_object_put(env, osc2cl(osc));
3347         }
3348
3349         RETURN(0);
3350 }
3351 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3352
3353 static int osc_import_event(struct obd_device *obd,
3354                             struct obd_import *imp,
3355                             enum obd_import_event event)
3356 {
3357         struct client_obd *cli;
3358         int rc = 0;
3359
3360         ENTRY;
3361         LASSERT(imp->imp_obd == obd);
3362
3363         switch (event) {
3364         case IMP_EVENT_DISCON: {
3365                 cli = &obd->u.cli;
3366                 spin_lock(&cli->cl_loi_list_lock);
3367                 cli->cl_avail_grant = 0;
3368                 cli->cl_lost_grant = 0;
3369                 spin_unlock(&cli->cl_loi_list_lock);
3370                 break;
3371         }
3372         case IMP_EVENT_INACTIVE: {
3373                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3374                 break;
3375         }
3376         case IMP_EVENT_INVALIDATE: {
3377                 struct ldlm_namespace *ns = obd->obd_namespace;
3378                 struct lu_env         *env;
3379                 __u16                  refcheck;
3380
3381                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3382
3383                 env = cl_env_get(&refcheck);
3384                 if (!IS_ERR(env)) {
3385                         osc_io_unplug(env, &obd->u.cli, NULL);
3386
3387                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3388                                                  osc_ldlm_resource_invalidate,
3389                                                  env, 0);
3390                         cl_env_put(env, &refcheck);
3391
3392                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3393                 } else
3394                         rc = PTR_ERR(env);
3395                 break;
3396         }
3397         case IMP_EVENT_ACTIVE: {
3398                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3399                 break;
3400         }
3401         case IMP_EVENT_OCD: {
3402                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3403
3404                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3405                         osc_init_grant(&obd->u.cli, ocd);
3406
3407                 /* See bug 7198 */
3408                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3409                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3410
3411                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3412                 break;
3413         }
3414         case IMP_EVENT_DEACTIVATE: {
3415                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3416                 break;
3417         }
3418         case IMP_EVENT_ACTIVATE: {
3419                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3420                 break;
3421         }
3422         default:
3423                 CERROR("Unknown import event %d\n", event);
3424                 LBUG();
3425         }
3426         RETURN(rc);
3427 }
3428
3429 /**
3430  * Determine whether the lock can be canceled before replaying the lock
3431  * during recovery, see bug16774 for detailed information.
3432  *
3433  * \retval zero the lock can't be canceled
3434  * \retval other ok to cancel
3435  */
3436 static int osc_cancel_weight(struct ldlm_lock *lock)
3437 {
3438         /*
3439          * Cancel all unused and granted extent lock.
3440          */
3441         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3442             ldlm_is_granted(lock) &&
3443             osc_ldlm_weigh_ast(lock) == 0)
3444                 RETURN(1);
3445
3446         RETURN(0);
3447 }
3448
3449 static int brw_queue_work(const struct lu_env *env, void *data)
3450 {
3451         struct client_obd *cli = data;
3452
3453         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3454
3455         osc_io_unplug(env, cli, NULL);
3456         RETURN(0);
3457 }
3458
3459 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3460 {
3461         struct client_obd *cli = &obd->u.cli;
3462         void *handler;
3463         int rc;
3464
3465         ENTRY;
3466
3467         rc = ptlrpcd_addref();
3468         if (rc)
3469                 RETURN(rc);
3470
3471         rc = client_obd_setup(obd, lcfg);
3472         if (rc)
3473                 GOTO(out_ptlrpcd, rc);
3474
3475
3476         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3477         if (IS_ERR(handler))
3478                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3479         cli->cl_writeback_work = handler;
3480
3481         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3482         if (IS_ERR(handler))
3483                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3484         cli->cl_lru_work = handler;
3485
3486         rc = osc_quota_setup(obd);
3487         if (rc)
3488                 GOTO(out_ptlrpcd_work, rc);
3489
3490         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3491         osc_update_next_shrink(cli);
3492
3493         RETURN(rc);
3494
3495 out_ptlrpcd_work:
3496         if (cli->cl_writeback_work != NULL) {
3497                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3498                 cli->cl_writeback_work = NULL;
3499         }
3500         if (cli->cl_lru_work != NULL) {
3501                 ptlrpcd_destroy_work(cli->cl_lru_work);
3502                 cli->cl_lru_work = NULL;
3503         }
3504         client_obd_cleanup(obd);
3505 out_ptlrpcd:
3506         ptlrpcd_decref();
3507         RETURN(rc);
3508 }
3509 EXPORT_SYMBOL(osc_setup_common);
3510
3511 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3512 {
3513         struct client_obd *cli = &obd->u.cli;
3514         int                adding;
3515         int                added;
3516         int                req_count;
3517         int                rc;
3518
3519         ENTRY;
3520
3521         rc = osc_setup_common(obd, lcfg);
3522         if (rc < 0)
3523                 RETURN(rc);
3524
3525         rc = osc_tunables_init(obd);
3526         if (rc)
3527                 RETURN(rc);
3528
3529         /*
3530          * We try to control the total number of requests with a upper limit
3531          * osc_reqpool_maxreqcount. There might be some race which will cause
3532          * over-limit allocation, but it is fine.
3533          */
3534         req_count = atomic_read(&osc_pool_req_count);
3535         if (req_count < osc_reqpool_maxreqcount) {
3536                 adding = cli->cl_max_rpcs_in_flight + 2;
3537                 if (req_count + adding > osc_reqpool_maxreqcount)
3538                         adding = osc_reqpool_maxreqcount - req_count;
3539
3540                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3541                 atomic_add(added, &osc_pool_req_count);
3542         }
3543
3544         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3545
3546         spin_lock(&osc_shrink_lock);
3547         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3548         spin_unlock(&osc_shrink_lock);
3549         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3550         cli->cl_import->imp_idle_debug = D_HA;
3551
3552         RETURN(0);
3553 }
3554
3555 int osc_precleanup_common(struct obd_device *obd)
3556 {
3557         struct client_obd *cli = &obd->u.cli;
3558         ENTRY;
3559
3560         /* LU-464
3561          * for echo client, export may be on zombie list, wait for
3562          * zombie thread to cull it, because cli.cl_import will be
3563          * cleared in client_disconnect_export():
3564          *   class_export_destroy() -> obd_cleanup() ->
3565          *   echo_device_free() -> echo_client_cleanup() ->
3566          *   obd_disconnect() -> osc_disconnect() ->
3567          *   client_disconnect_export()
3568          */
3569         obd_zombie_barrier();
3570         if (cli->cl_writeback_work) {
3571                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3572                 cli->cl_writeback_work = NULL;
3573         }
3574
3575         if (cli->cl_lru_work) {
3576                 ptlrpcd_destroy_work(cli->cl_lru_work);
3577                 cli->cl_lru_work = NULL;
3578         }
3579
3580         obd_cleanup_client_import(obd);
3581         RETURN(0);
3582 }
3583 EXPORT_SYMBOL(osc_precleanup_common);
3584
3585 static int osc_precleanup(struct obd_device *obd)
3586 {
3587         ENTRY;
3588
3589         osc_precleanup_common(obd);
3590
3591         ptlrpc_lprocfs_unregister_obd(obd);
3592         RETURN(0);
3593 }
3594
3595 int osc_cleanup_common(struct obd_device *obd)
3596 {
3597         struct client_obd *cli = &obd->u.cli;
3598         int rc;
3599
3600         ENTRY;
3601
3602         spin_lock(&osc_shrink_lock);
3603         list_del(&cli->cl_shrink_list);
3604         spin_unlock(&osc_shrink_lock);
3605
3606         /* lru cleanup */
3607         if (cli->cl_cache != NULL) {
3608                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3609                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3610                 list_del_init(&cli->cl_lru_osc);
3611                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3612                 cli->cl_lru_left = NULL;
3613                 cl_cache_decref(cli->cl_cache);
3614                 cli->cl_cache = NULL;
3615         }
3616
3617         /* free memory of osc quota cache */
3618         osc_quota_cleanup(obd);
3619
3620         rc = client_obd_cleanup(obd);
3621
3622         ptlrpcd_decref();
3623         RETURN(rc);
3624 }
3625 EXPORT_SYMBOL(osc_cleanup_common);
3626
3627 static const struct obd_ops osc_obd_ops = {
3628         .o_owner                = THIS_MODULE,
3629         .o_setup                = osc_setup,
3630         .o_precleanup           = osc_precleanup,
3631         .o_cleanup              = osc_cleanup_common,
3632         .o_add_conn             = client_import_add_conn,
3633         .o_del_conn             = client_import_del_conn,
3634         .o_connect              = client_connect_import,
3635         .o_reconnect            = osc_reconnect,
3636         .o_disconnect           = osc_disconnect,
3637         .o_statfs               = osc_statfs,
3638         .o_statfs_async         = osc_statfs_async,
3639         .o_create               = osc_create,
3640         .o_destroy              = osc_destroy,
3641         .o_getattr              = osc_getattr,
3642         .o_setattr              = osc_setattr,
3643         .o_iocontrol            = osc_iocontrol,
3644         .o_set_info_async       = osc_set_info_async,
3645         .o_import_event         = osc_import_event,
3646         .o_quotactl             = osc_quotactl,
3647 };
3648
3649 LIST_HEAD(osc_shrink_list);
3650 DEFINE_SPINLOCK(osc_shrink_lock);
3651
3652 #ifdef HAVE_SHRINKER_COUNT
3653 static struct shrinker osc_cache_shrinker = {
3654         .count_objects  = osc_cache_shrink_count,
3655         .scan_objects   = osc_cache_shrink_scan,
3656         .seeks          = DEFAULT_SEEKS,
3657 };
3658 #else
3659 static int osc_cache_shrink(struct shrinker *shrinker,
3660                             struct shrink_control *sc)
3661 {
3662         (void)osc_cache_shrink_scan(shrinker, sc);
3663
3664         return osc_cache_shrink_count(shrinker, sc);
3665 }
3666
3667 static struct shrinker osc_cache_shrinker = {
3668         .shrink   = osc_cache_shrink,
3669         .seeks    = DEFAULT_SEEKS,
3670 };
3671 #endif
3672
3673 static int __init osc_init(void)
3674 {
3675         unsigned int reqpool_size;
3676         unsigned int reqsize;
3677         int rc;
3678         ENTRY;
3679
3680         /* print an address of _any_ initialized kernel symbol from this
3681          * module, to allow debugging with gdb that doesn't support data
3682          * symbols from modules.*/
3683         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3684
3685         rc = lu_kmem_init(osc_caches);
3686         if (rc)
3687                 RETURN(rc);
3688
3689         rc = class_register_type(&osc_obd_ops, NULL, true,
3690                                  LUSTRE_OSC_NAME, &osc_device_type);
3691         if (rc)
3692                 GOTO(out_kmem, rc);
3693
3694         rc = register_shrinker(&osc_cache_shrinker);
3695         if (rc)
3696                 GOTO(out_type, rc);
3697
3698         /* This is obviously too much memory, only prevent overflow here */
3699         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3700                 GOTO(out_shrinker, rc = -EINVAL);
3701
3702         reqpool_size = osc_reqpool_mem_max << 20;
3703
3704         reqsize = 1;
3705         while (reqsize < OST_IO_MAXREQSIZE)
3706                 reqsize = reqsize << 1;
3707
3708         /*
3709          * We don't enlarge the request count in OSC pool according to
3710          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3711          * tried after normal allocation failed. So a small OSC pool won't
3712          * cause much performance degression in most of cases.
3713          */
3714         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3715
3716         atomic_set(&osc_pool_req_count, 0);
3717         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3718                                           ptlrpc_add_rqs_to_pool);
3719
3720         if (osc_rq_pool == NULL)
3721                 GOTO(out_shrinker, rc = -ENOMEM);
3722
3723         rc = osc_start_grant_work();
3724         if (rc != 0)
3725                 GOTO(out_req_pool, rc);
3726
3727         RETURN(rc);
3728
3729 out_req_pool:
3730         ptlrpc_free_rq_pool(osc_rq_pool);
3731 out_shrinker:
3732         unregister_shrinker(&osc_cache_shrinker);
3733 out_type:
3734         class_unregister_type(LUSTRE_OSC_NAME);
3735 out_kmem:
3736         lu_kmem_fini(osc_caches);
3737
3738         RETURN(rc);
3739 }
3740
3741 static void __exit osc_exit(void)
3742 {
3743         osc_stop_grant_work();
3744         unregister_shrinker(&osc_cache_shrinker);
3745         class_unregister_type(LUSTRE_OSC_NAME);
3746         lu_kmem_fini(osc_caches);
3747         ptlrpc_free_rq_pool(osc_rq_pool);
3748 }
3749
3750 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3751 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3752 MODULE_VERSION(LUSTRE_VERSION_STRING);
3753 MODULE_LICENSE("GPL");
3754
3755 module_init(osc_init);
3756 module_exit(osc_exit);