Whamcloud - gitweb
b475f5610318256e8486c32d7033d43742bb4e56
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
49
50 #include "osc_internal.h"
51
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
55
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
59
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
62
63 #define osc_grant_args osc_brw_async_args
64
65 struct osc_setattr_args {
66         struct obdo             *sa_oa;
67         obd_enqueue_update_f     sa_upcall;
68         void                    *sa_cookie;
69 };
70
71 struct osc_fsync_args {
72         struct osc_object       *fa_obj;
73         struct obdo             *fa_oa;
74         obd_enqueue_update_f    fa_upcall;
75         void                    *fa_cookie;
76 };
77
78 struct osc_ladvise_args {
79         struct obdo             *la_oa;
80         obd_enqueue_update_f     la_upcall;
81         void                    *la_cookie;
82 };
83
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
86                          void *data, int rc);
87
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 {
90         struct ost_body *body;
91
92         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
93         LASSERT(body);
94
95         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
96 }
97
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
99                        struct obdo *oa)
100 {
101         struct ptlrpc_request   *req;
102         struct ost_body         *body;
103         int                      rc;
104
105         ENTRY;
106         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
107         if (req == NULL)
108                 RETURN(-ENOMEM);
109
110         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111         if (rc) {
112                 ptlrpc_request_free(req);
113                 RETURN(rc);
114         }
115
116         osc_pack_req_body(req, oa);
117
118         ptlrpc_request_set_replen(req);
119
120         rc = ptlrpc_queue_wait(req);
121         if (rc)
122                 GOTO(out, rc);
123
124         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125         if (body == NULL)
126                 GOTO(out, rc = -EPROTO);
127
128         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130
131         oa->o_blksize = cli_brw_size(exp->exp_obd);
132         oa->o_valid |= OBD_MD_FLBLKSZ;
133
134         EXIT;
135 out:
136         ptlrpc_req_finished(req);
137
138         return rc;
139 }
140
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
142                        struct obdo *oa)
143 {
144         struct ptlrpc_request   *req;
145         struct ost_body         *body;
146         int                      rc;
147
148         ENTRY;
149         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150
151         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
152         if (req == NULL)
153                 RETURN(-ENOMEM);
154
155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156         if (rc) {
157                 ptlrpc_request_free(req);
158                 RETURN(rc);
159         }
160
161         osc_pack_req_body(req, oa);
162
163         ptlrpc_request_set_replen(req);
164
165         rc = ptlrpc_queue_wait(req);
166         if (rc)
167                 GOTO(out, rc);
168
169         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170         if (body == NULL)
171                 GOTO(out, rc = -EPROTO);
172
173         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
174
175         EXIT;
176 out:
177         ptlrpc_req_finished(req);
178
179         RETURN(rc);
180 }
181
182 static int osc_setattr_interpret(const struct lu_env *env,
183                                  struct ptlrpc_request *req, void *args, int rc)
184 {
185         struct osc_setattr_args *sa = args;
186         struct ost_body *body;
187
188         ENTRY;
189
190         if (rc != 0)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
198                              &body->oa);
199 out:
200         rc = sa->sa_upcall(sa->sa_cookie, rc);
201         RETURN(rc);
202 }
203
204 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
205                       obd_enqueue_update_f upcall, void *cookie,
206                       struct ptlrpc_request_set *rqset)
207 {
208         struct ptlrpc_request   *req;
209         struct osc_setattr_args *sa;
210         int                      rc;
211
212         ENTRY;
213
214         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
215         if (req == NULL)
216                 RETURN(-ENOMEM);
217
218         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219         if (rc) {
220                 ptlrpc_request_free(req);
221                 RETURN(rc);
222         }
223
224         osc_pack_req_body(req, oa);
225
226         ptlrpc_request_set_replen(req);
227
228         /* do mds to ost setattr asynchronously */
229         if (!rqset) {
230                 /* Do not wait for response. */
231                 ptlrpcd_add_req(req);
232         } else {
233                 req->rq_interpret_reply = osc_setattr_interpret;
234
235                 sa = ptlrpc_req_async_args(sa, req);
236                 sa->sa_oa = oa;
237                 sa->sa_upcall = upcall;
238                 sa->sa_cookie = cookie;
239
240                 ptlrpc_set_add_req(rqset, req);
241         }
242
243         RETURN(0);
244 }
245
246 static int osc_ladvise_interpret(const struct lu_env *env,
247                                  struct ptlrpc_request *req,
248                                  void *arg, int rc)
249 {
250         struct osc_ladvise_args *la = arg;
251         struct ost_body *body;
252         ENTRY;
253
254         if (rc != 0)
255                 GOTO(out, rc);
256
257         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
258         if (body == NULL)
259                 GOTO(out, rc = -EPROTO);
260
261         *la->la_oa = body->oa;
262 out:
263         rc = la->la_upcall(la->la_cookie, rc);
264         RETURN(rc);
265 }
266
267 /**
268  * If rqset is NULL, do not wait for response. Upcall and cookie could also
269  * be NULL in this case
270  */
271 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
272                      struct ladvise_hdr *ladvise_hdr,
273                      obd_enqueue_update_f upcall, void *cookie,
274                      struct ptlrpc_request_set *rqset)
275 {
276         struct ptlrpc_request   *req;
277         struct ost_body         *body;
278         struct osc_ladvise_args *la;
279         int                      rc;
280         struct lu_ladvise       *req_ladvise;
281         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
282         int                      num_advise = ladvise_hdr->lah_count;
283         struct ladvise_hdr      *req_ladvise_hdr;
284         ENTRY;
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
287         if (req == NULL)
288                 RETURN(-ENOMEM);
289
290         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
291                              num_advise * sizeof(*ladvise));
292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
293         if (rc != 0) {
294                 ptlrpc_request_free(req);
295                 RETURN(rc);
296         }
297         req->rq_request_portal = OST_IO_PORTAL;
298         ptlrpc_at_set_req_timeout(req);
299
300         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
301         LASSERT(body);
302         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
303                              oa);
304
305         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
306                                                  &RMF_OST_LADVISE_HDR);
307         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
308
309         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
310         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
311         ptlrpc_request_set_replen(req);
312
313         if (rqset == NULL) {
314                 /* Do not wait for response. */
315                 ptlrpcd_add_req(req);
316                 RETURN(0);
317         }
318
319         req->rq_interpret_reply = osc_ladvise_interpret;
320         la = ptlrpc_req_async_args(la, req);
321         la->la_oa = oa;
322         la->la_upcall = upcall;
323         la->la_cookie = cookie;
324
325         ptlrpc_set_add_req(rqset, req);
326
327         RETURN(0);
328 }
329
330 static int osc_create(const struct lu_env *env, struct obd_export *exp,
331                       struct obdo *oa)
332 {
333         struct ptlrpc_request *req;
334         struct ost_body       *body;
335         int                    rc;
336         ENTRY;
337
338         LASSERT(oa != NULL);
339         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
340         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
341
342         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
343         if (req == NULL)
344                 GOTO(out, rc = -ENOMEM);
345
346         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
347         if (rc) {
348                 ptlrpc_request_free(req);
349                 GOTO(out, rc);
350         }
351
352         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
353         LASSERT(body);
354
355         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
356
357         ptlrpc_request_set_replen(req);
358
359         rc = ptlrpc_queue_wait(req);
360         if (rc)
361                 GOTO(out_req, rc);
362
363         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364         if (body == NULL)
365                 GOTO(out_req, rc = -EPROTO);
366
367         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
368         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
369
370         oa->o_blksize = cli_brw_size(exp->exp_obd);
371         oa->o_valid |= OBD_MD_FLBLKSZ;
372
373         CDEBUG(D_HA, "transno: %lld\n",
374                lustre_msg_get_transno(req->rq_repmsg));
375 out_req:
376         ptlrpc_req_finished(req);
377 out:
378         RETURN(rc);
379 }
380
381 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
382                    obd_enqueue_update_f upcall, void *cookie)
383 {
384         struct ptlrpc_request *req;
385         struct osc_setattr_args *sa;
386         struct obd_import *imp = class_exp2cliimp(exp);
387         struct ost_body *body;
388         int rc;
389
390         ENTRY;
391
392         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
393         if (req == NULL)
394                 RETURN(-ENOMEM);
395
396         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
397         if (rc < 0) {
398                 ptlrpc_request_free(req);
399                 RETURN(rc);
400         }
401
402         osc_set_io_portal(req);
403
404         ptlrpc_at_set_req_timeout(req);
405
406         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
407
408         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
409
410         ptlrpc_request_set_replen(req);
411
412         req->rq_interpret_reply = osc_setattr_interpret;
413         sa = ptlrpc_req_async_args(sa, req);
414         sa->sa_oa = oa;
415         sa->sa_upcall = upcall;
416         sa->sa_cookie = cookie;
417
418         ptlrpcd_add_req(req);
419
420         RETURN(0);
421 }
422 EXPORT_SYMBOL(osc_punch_send);
423
424 /**
425  * osc_fallocate_base() - Handles fallocate request.
426  *
427  * @exp:        Export structure
428  * @oa:         Attributes passed to OSS from client (obdo structure)
429  * @upcall:     Primary & supplementary group information
430  * @cookie:     Exclusive identifier
431  * @rqset:      Request list.
432  * @mode:       Operation done on given range.
433  *
434  * osc_fallocate_base() - Handles fallocate requests only. Only block
435  * allocation or standard preallocate operation is supported currently.
436  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
437  * is supported via SETATTR request.
438  *
439  * Return: Non-zero on failure and O on success.
440  */
441 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
442                        obd_enqueue_update_f upcall, void *cookie, int mode)
443 {
444         struct ptlrpc_request *req;
445         struct osc_setattr_args *sa;
446         struct ost_body *body;
447         struct obd_import *imp = class_exp2cliimp(exp);
448         int rc;
449         ENTRY;
450
451         oa->o_falloc_mode = mode;
452         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
453                                    &RQF_OST_FALLOCATE);
454         if (req == NULL)
455                 RETURN(-ENOMEM);
456
457         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
458         if (rc != 0) {
459                 ptlrpc_request_free(req);
460                 RETURN(rc);
461         }
462
463         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464         LASSERT(body);
465
466         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
467
468         ptlrpc_request_set_replen(req);
469
470         req->rq_interpret_reply = osc_setattr_interpret;
471         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
472         sa = ptlrpc_req_async_args(sa, req);
473         sa->sa_oa = oa;
474         sa->sa_upcall = upcall;
475         sa->sa_cookie = cookie;
476
477         ptlrpcd_add_req(req);
478
479         RETURN(0);
480 }
481 EXPORT_SYMBOL(osc_fallocate_base);
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req, void *args, int rc)
485 {
486         struct osc_fsync_args *fa = args;
487         struct ost_body *body;
488         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
489         unsigned long valid = 0;
490         struct cl_object *obj;
491         ENTRY;
492
493         if (rc != 0)
494                 GOTO(out, rc);
495
496         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
497         if (body == NULL) {
498                 CERROR("can't unpack ost_body\n");
499                 GOTO(out, rc = -EPROTO);
500         }
501
502         *fa->fa_oa = body->oa;
503         obj = osc2cl(fa->fa_obj);
504
505         /* Update osc object's blocks attribute */
506         cl_object_attr_lock(obj);
507         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
508                 attr->cat_blocks = body->oa.o_blocks;
509                 valid |= CAT_BLOCKS;
510         }
511
512         if (valid != 0)
513                 cl_object_attr_update(env, obj, attr, valid);
514         cl_object_attr_unlock(obj);
515
516 out:
517         rc = fa->fa_upcall(fa->fa_cookie, rc);
518         RETURN(rc);
519 }
520
521 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
522                   obd_enqueue_update_f upcall, void *cookie,
523                   struct ptlrpc_request_set *rqset)
524 {
525         struct obd_export     *exp = osc_export(obj);
526         struct ptlrpc_request *req;
527         struct ost_body       *body;
528         struct osc_fsync_args *fa;
529         int                    rc;
530         ENTRY;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 RETURN(rc);
540         }
541
542         /* overload the size and blocks fields in the oa with start/end */
543         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544         LASSERT(body);
545         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
546
547         ptlrpc_request_set_replen(req);
548         req->rq_interpret_reply = osc_sync_interpret;
549
550         fa = ptlrpc_req_async_args(fa, req);
551         fa->fa_obj = obj;
552         fa->fa_oa = oa;
553         fa->fa_upcall = upcall;
554         fa->fa_cookie = cookie;
555
556         ptlrpc_set_add_req(rqset, req);
557
558         RETURN (0);
559 }
560
561 /* Find and cancel locally locks matched by @mode in the resource found by
562  * @objid. Found locks are added into @cancel list. Returns the amount of
563  * locks added to @cancels list. */
564 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
565                                    struct list_head *cancels,
566                                    enum ldlm_mode mode, __u64 lock_flags)
567 {
568         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
569         struct ldlm_res_id res_id;
570         struct ldlm_resource *res;
571         int count;
572         ENTRY;
573
574         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
575          * export) but disabled through procfs (flag in NS).
576          *
577          * This distinguishes from a case when ELC is not supported originally,
578          * when we still want to cancel locks in advance and just cancel them
579          * locally, without sending any RPC. */
580         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
581                 RETURN(0);
582
583         ostid_build_res_name(&oa->o_oi, &res_id);
584         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         if (IS_ERR(res))
586                 RETURN(0);
587
588         LDLM_RESOURCE_ADDREF(res);
589         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
590                                            lock_flags, 0, NULL);
591         LDLM_RESOURCE_DELREF(res);
592         ldlm_resource_putref(res);
593         RETURN(count);
594 }
595
596 static int osc_destroy_interpret(const struct lu_env *env,
597                                  struct ptlrpc_request *req, void *args, int rc)
598 {
599         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
600
601         atomic_dec(&cli->cl_destroy_in_flight);
602         wake_up(&cli->cl_destroy_waitq);
603
604         return 0;
605 }
606
607 static int osc_can_send_destroy(struct client_obd *cli)
608 {
609         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
610             cli->cl_max_rpcs_in_flight) {
611                 /* The destroy request can be sent */
612                 return 1;
613         }
614         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
615             cli->cl_max_rpcs_in_flight) {
616                 /*
617                  * The counter has been modified between the two atomic
618                  * operations.
619                  */
620                 wake_up(&cli->cl_destroy_waitq);
621         }
622         return 0;
623 }
624
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         LIST_HEAD(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
650                                0, &cancels, count);
651         if (rc) {
652                 ptlrpc_request_free(req);
653                 RETURN(rc);
654         }
655
656         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
657         ptlrpc_at_set_req_timeout(req);
658
659         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
660         LASSERT(body);
661         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
662
663         ptlrpc_request_set_replen(req);
664
665         req->rq_interpret_reply = osc_destroy_interpret;
666         if (!osc_can_send_destroy(cli)) {
667                 /*
668                  * Wait until the number of on-going destroy RPCs drops
669                  * under max_rpc_in_flight
670                  */
671                 rc = l_wait_event_abortable_exclusive(
672                         cli->cl_destroy_waitq,
673                         osc_can_send_destroy(cli));
674                 if (rc) {
675                         ptlrpc_req_finished(req);
676                         RETURN(-EINTR);
677                 }
678         }
679
680         /* Do not wait for response */
681         ptlrpcd_add_req(req);
682         RETURN(0);
683 }
684
685 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
686                                 long writing_bytes)
687 {
688         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
689
690         LASSERT(!(oa->o_valid & bits));
691
692         oa->o_valid |= bits;
693         spin_lock(&cli->cl_loi_list_lock);
694         if (cli->cl_ocd_grant_param)
695                 oa->o_dirty = cli->cl_dirty_grant;
696         else
697                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
698         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
699                 CERROR("dirty %lu > dirty_max %lu\n",
700                        cli->cl_dirty_pages,
701                        cli->cl_dirty_max_pages);
702                 oa->o_undirty = 0;
703         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
704                             (long)(obd_max_dirty_pages + 1))) {
705                 /* The atomic_read() allowing the atomic_inc() are
706                  * not covered by a lock thus they may safely race and trip
707                  * this CERROR() unless we add in a small fudge factor (+1). */
708                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
709                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
710                        obd_max_dirty_pages);
711                 oa->o_undirty = 0;
712         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
713                             0x7fffffff)) {
714                 CERROR("dirty %lu - dirty_max %lu too big???\n",
715                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
716                 oa->o_undirty = 0;
717         } else {
718                 unsigned long nrpages;
719                 unsigned long undirty;
720
721                 nrpages = cli->cl_max_pages_per_rpc;
722                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
723                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
724                 undirty = nrpages << PAGE_SHIFT;
725                 if (cli->cl_ocd_grant_param) {
726                         int nrextents;
727
728                         /* take extent tax into account when asking for more
729                          * grant space */
730                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
731                                      cli->cl_max_extent_pages;
732                         undirty += nrextents * cli->cl_grant_extent_tax;
733                 }
734                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
735                  * to add extent tax, etc.
736                  */
737                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
738                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
739         }
740         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
741         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
742         if (cli->cl_lost_grant > INT_MAX) {
743                 CDEBUG(D_CACHE,
744                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
745                       cli_name(cli), cli->cl_lost_grant);
746                 oa->o_dropped = INT_MAX;
747         } else {
748                 oa->o_dropped = cli->cl_lost_grant;
749         }
750         cli->cl_lost_grant -= oa->o_dropped;
751         spin_unlock(&cli->cl_loi_list_lock);
752         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
753                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
754                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 unsigned long consumed = cli->cl_reserved_grant;
1027
1028                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1029                         consumed += cli->cl_dirty_grant;
1030                 else
1031                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1032                 if (cli->cl_avail_grant < consumed) {
1033                         CERROR("%s: granted %ld but already consumed %ld\n",
1034                                cli_name(cli), cli->cl_avail_grant, consumed);
1035                         cli->cl_avail_grant = 0;
1036                 } else {
1037                         cli->cl_avail_grant -= consumed;
1038                 }
1039         }
1040
1041         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1042                 u64 size;
1043                 int chunk_mask;
1044
1045                 /* overhead for each extent insertion */
1046                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1047                 /* determine the appropriate chunk size used by osc_extent. */
1048                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1049                                           ocd->ocd_grant_blkbits);
1050                 /* max_pages_per_rpc must be chunk aligned */
1051                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1052                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1053                                              ~chunk_mask) & chunk_mask;
1054                 /* determine maximum extent size, in #pages */
1055                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1056                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1057                 cli->cl_ocd_grant_param = 1;
1058         } else {
1059                 cli->cl_ocd_grant_param = 0;
1060                 cli->cl_grant_extent_tax = 0;
1061                 cli->cl_chunkbits = PAGE_SHIFT;
1062                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1063         }
1064         spin_unlock(&cli->cl_loi_list_lock);
1065
1066         CDEBUG(D_CACHE,
1067                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1068                cli_name(cli),
1069                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1070                cli->cl_max_extent_pages);
1071
1072         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1073                 osc_add_grant_list(cli);
1074 }
1075 EXPORT_SYMBOL(osc_init_grant);
1076
1077 /* We assume that the reason this OSC got a short read is because it read
1078  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1079  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1080  * this stripe never got written at or beyond this stripe offset yet. */
1081 static void handle_short_read(int nob_read, size_t page_count,
1082                               struct brw_page **pga)
1083 {
1084         char *ptr;
1085         int i = 0;
1086
1087         /* skip bytes read OK */
1088         while (nob_read > 0) {
1089                 LASSERT (page_count > 0);
1090
1091                 if (pga[i]->count > nob_read) {
1092                         /* EOF inside this page */
1093                         ptr = kmap(pga[i]->pg) +
1094                                 (pga[i]->off & ~PAGE_MASK);
1095                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1096                         kunmap(pga[i]->pg);
1097                         page_count--;
1098                         i++;
1099                         break;
1100                 }
1101
1102                 nob_read -= pga[i]->count;
1103                 page_count--;
1104                 i++;
1105         }
1106
1107         /* zero remaining pages */
1108         while (page_count-- > 0) {
1109                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1110                 memset(ptr, 0, pga[i]->count);
1111                 kunmap(pga[i]->pg);
1112                 i++;
1113         }
1114 }
1115
1116 static int check_write_rcs(struct ptlrpc_request *req,
1117                            int requested_nob, int niocount,
1118                            size_t page_count, struct brw_page **pga)
1119 {
1120         int     i;
1121         __u32   *remote_rcs;
1122
1123         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1124                                                   sizeof(*remote_rcs) *
1125                                                   niocount);
1126         if (remote_rcs == NULL) {
1127                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1128                 return(-EPROTO);
1129         }
1130
1131         /* return error if any niobuf was in error */
1132         for (i = 0; i < niocount; i++) {
1133                 if ((int)remote_rcs[i] < 0) {
1134                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1135                                i, remote_rcs[i], req);
1136                         return remote_rcs[i];
1137                 }
1138
1139                 if (remote_rcs[i] != 0) {
1140                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1141                                 i, remote_rcs[i], req);
1142                         return(-EPROTO);
1143                 }
1144         }
1145         if (req->rq_bulk != NULL &&
1146             req->rq_bulk->bd_nob_transferred != requested_nob) {
1147                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1148                        req->rq_bulk->bd_nob_transferred, requested_nob);
1149                 return(-EPROTO);
1150         }
1151
1152         return (0);
1153 }
1154
1155 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1156 {
1157         if (p1->flag != p2->flag) {
1158                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1159                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1160                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1161
1162                 /* warn if we try to combine flags that we don't know to be
1163                  * safe to combine */
1164                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1165                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1166                               "report this at https://jira.whamcloud.com/\n",
1167                               p1->flag, p2->flag);
1168                 }
1169                 return 0;
1170         }
1171
1172         return (p1->off + p1->count == p2->off);
1173 }
1174
1175 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1176 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1177                                    size_t pg_count, struct brw_page **pga,
1178                                    int opc, obd_dif_csum_fn *fn,
1179                                    int sector_size,
1180                                    u32 *check_sum)
1181 {
1182         struct ahash_request *req;
1183         /* Used Adler as the default checksum type on top of DIF tags */
1184         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1185         struct page *__page;
1186         unsigned char *buffer;
1187         __u16 *guard_start;
1188         unsigned int bufsize;
1189         int guard_number;
1190         int used_number = 0;
1191         int used;
1192         u32 cksum;
1193         int rc = 0;
1194         int i = 0;
1195
1196         LASSERT(pg_count > 0);
1197
1198         __page = alloc_page(GFP_KERNEL);
1199         if (__page == NULL)
1200                 return -ENOMEM;
1201
1202         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1203         if (IS_ERR(req)) {
1204                 rc = PTR_ERR(req);
1205                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1206                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1207                 GOTO(out, rc);
1208         }
1209
1210         buffer = kmap(__page);
1211         guard_start = (__u16 *)buffer;
1212         guard_number = PAGE_SIZE / sizeof(*guard_start);
1213         while (nob > 0 && pg_count > 0) {
1214                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216                 /* corrupt the data before we compute the checksum, to
1217                  * simulate an OST->client data error */
1218                 if (unlikely(i == 0 && opc == OST_READ &&
1219                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1220                         unsigned char *ptr = kmap(pga[i]->pg);
1221                         int off = pga[i]->off & ~PAGE_MASK;
1222
1223                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1224                         kunmap(pga[i]->pg);
1225                 }
1226
1227                 /*
1228                  * The left guard number should be able to hold checksums of a
1229                  * whole page
1230                  */
1231                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1232                                                   pga[i]->off & ~PAGE_MASK,
1233                                                   count,
1234                                                   guard_start + used_number,
1235                                                   guard_number - used_number,
1236                                                   &used, sector_size,
1237                                                   fn);
1238                 if (rc)
1239                         break;
1240
1241                 used_number += used;
1242                 if (used_number == guard_number) {
1243                         cfs_crypto_hash_update_page(req, __page, 0,
1244                                 used_number * sizeof(*guard_start));
1245                         used_number = 0;
1246                 }
1247
1248                 nob -= pga[i]->count;
1249                 pg_count--;
1250                 i++;
1251         }
1252         kunmap(__page);
1253         if (rc)
1254                 GOTO(out, rc);
1255
1256         if (used_number != 0)
1257                 cfs_crypto_hash_update_page(req, __page, 0,
1258                         used_number * sizeof(*guard_start));
1259
1260         bufsize = sizeof(cksum);
1261         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1262
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 cksum++;
1267
1268         *check_sum = cksum;
1269 out:
1270         __free_page(__page);
1271         return rc;
1272 }
1273 #else /* !CONFIG_CRC_T10DIF */
1274 #define obd_dif_ip_fn NULL
1275 #define obd_dif_crc_fn NULL
1276 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1277         -EOPNOTSUPP
1278 #endif /* CONFIG_CRC_T10DIF */
1279
1280 static int osc_checksum_bulk(int nob, size_t pg_count,
1281                              struct brw_page **pga, int opc,
1282                              enum cksum_types cksum_type,
1283                              u32 *cksum)
1284 {
1285         int                             i = 0;
1286         struct ahash_request           *req;
1287         unsigned int                    bufsize;
1288         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1289
1290         LASSERT(pg_count > 0);
1291
1292         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1293         if (IS_ERR(req)) {
1294                 CERROR("Unable to initialize checksum hash %s\n",
1295                        cfs_crypto_hash_name(cfs_alg));
1296                 return PTR_ERR(req);
1297         }
1298
1299         while (nob > 0 && pg_count > 0) {
1300                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1301
1302                 /* corrupt the data before we compute the checksum, to
1303                  * simulate an OST->client data error */
1304                 if (i == 0 && opc == OST_READ &&
1305                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1306                         unsigned char *ptr = kmap(pga[i]->pg);
1307                         int off = pga[i]->off & ~PAGE_MASK;
1308
1309                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1310                         kunmap(pga[i]->pg);
1311                 }
1312                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1313                                             pga[i]->off & ~PAGE_MASK,
1314                                             count);
1315                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1316                                (int)(pga[i]->off & ~PAGE_MASK));
1317
1318                 nob -= pga[i]->count;
1319                 pg_count--;
1320                 i++;
1321         }
1322
1323         bufsize = sizeof(*cksum);
1324         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1325
1326         /* For sending we only compute the wrong checksum instead
1327          * of corrupting the data so it is still correct on a redo */
1328         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1329                 (*cksum)++;
1330
1331         return 0;
1332 }
1333
1334 static int osc_checksum_bulk_rw(const char *obd_name,
1335                                 enum cksum_types cksum_type,
1336                                 int nob, size_t pg_count,
1337                                 struct brw_page **pga, int opc,
1338                                 u32 *check_sum)
1339 {
1340         obd_dif_csum_fn *fn = NULL;
1341         int sector_size = 0;
1342         int rc;
1343
1344         ENTRY;
1345         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1346
1347         if (fn)
1348                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1349                                              opc, fn, sector_size, check_sum);
1350         else
1351                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1352                                        check_sum);
1353
1354         RETURN(rc);
1355 }
1356
1357 static inline void osc_release_bounce_pages(struct brw_page **pga,
1358                                             u32 page_count)
1359 {
1360 #ifdef HAVE_LUSTRE_CRYPTO
1361         int i;
1362
1363         for (i = 0; i < page_count; i++) {
1364                 /* Bounce pages allocated by a call to
1365                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1366                  * are identified thanks to the PageChecked flag.
1367                  */
1368                 if (PageChecked(pga[i]->pg))
1369                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1370                 pga[i]->count -= pga[i]->bp_count_diff;
1371                 pga[i]->off += pga[i]->bp_off_diff;
1372         }
1373 #endif
1374 }
1375
1376 static int
1377 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1378                      u32 page_count, struct brw_page **pga,
1379                      struct ptlrpc_request **reqp, int resend)
1380 {
1381         struct ptlrpc_request *req;
1382         struct ptlrpc_bulk_desc *desc;
1383         struct ost_body *body;
1384         struct obd_ioobj *ioobj;
1385         struct niobuf_remote *niobuf;
1386         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1387         struct osc_brw_async_args *aa;
1388         struct req_capsule *pill;
1389         struct brw_page *pg_prev;
1390         void *short_io_buf;
1391         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1392         struct inode *inode = NULL;
1393         bool directio = false;
1394
1395         ENTRY;
1396         if (pga[0]->pg) {
1397                 inode = page2inode(pga[0]->pg);
1398                 if (inode == NULL) {
1399                         /* Try to get reference to inode from cl_page if we are
1400                          * dealing with direct IO, as handled pages are not
1401                          * actual page cache pages.
1402                          */
1403                         struct osc_async_page *oap = brw_page2oap(pga[0]);
1404                         struct cl_page *clpage = oap2cl_page(oap);
1405
1406                         inode = clpage->cp_inode;
1407                         if (inode)
1408                                 directio = true;
1409                 }
1410         }
1411         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1412                 RETURN(-ENOMEM); /* Recoverable */
1413         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1414                 RETURN(-EINVAL); /* Fatal */
1415
1416         if ((cmd & OBD_BRW_WRITE) != 0) {
1417                 opc = OST_WRITE;
1418                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1419                                                 osc_rq_pool,
1420                                                 &RQF_OST_BRW_WRITE);
1421         } else {
1422                 opc = OST_READ;
1423                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1424         }
1425         if (req == NULL)
1426                 RETURN(-ENOMEM);
1427
1428         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1429                 for (i = 0; i < page_count; i++) {
1430                         struct brw_page *pg = pga[i];
1431                         struct page *data_page = NULL;
1432                         bool retried = false;
1433                         bool lockedbymyself;
1434                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1435                         struct address_space *map_orig = NULL;
1436                         pgoff_t index_orig;
1437
1438 retry_encrypt:
1439                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1440                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1441                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1442                         /* The page can already be locked when we arrive here.
1443                          * This is possible when cl_page_assume/vvp_page_assume
1444                          * is stuck on wait_on_page_writeback with page lock
1445                          * held. In this case there is no risk for the lock to
1446                          * be released while we are doing our encryption
1447                          * processing, because writeback against that page will
1448                          * end in vvp_page_completion_write/cl_page_completion,
1449                          * which means only once the page is fully processed.
1450                          */
1451                         lockedbymyself = trylock_page(pg->pg);
1452                         if (directio) {
1453                                 map_orig = pg->pg->mapping;
1454                                 pg->pg->mapping = inode->i_mapping;
1455                                 index_orig = pg->pg->index;
1456                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1457                         }
1458                         data_page =
1459                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1460                                                                  nunits, 0,
1461                                                                  GFP_NOFS);
1462                         if (directio) {
1463                                 pg->pg->mapping = map_orig;
1464                                 pg->pg->index = index_orig;
1465                         }
1466                         if (lockedbymyself)
1467                                 unlock_page(pg->pg);
1468                         if (IS_ERR(data_page)) {
1469                                 rc = PTR_ERR(data_page);
1470                                 if (rc == -ENOMEM && !retried) {
1471                                         retried = true;
1472                                         rc = 0;
1473                                         goto retry_encrypt;
1474                                 }
1475                                 ptlrpc_request_free(req);
1476                                 RETURN(rc);
1477                         }
1478                         /* Set PageChecked flag on bounce page for
1479                          * disambiguation in osc_release_bounce_pages().
1480                          */
1481                         SetPageChecked(data_page);
1482                         pg->pg = data_page;
1483                         /* there should be no gap in the middle of page array */
1484                         if (i == page_count - 1) {
1485                                 struct osc_async_page *oap = brw_page2oap(pg);
1486
1487                                 oa->o_size = oap->oap_count +
1488                                         oap->oap_obj_off + oap->oap_page_off;
1489                         }
1490                         /* len is forced to nunits, and relative offset to 0
1491                          * so store the old, clear text info
1492                          */
1493                         pg->bp_count_diff = nunits - pg->count;
1494                         pg->count = nunits;
1495                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1496                         pg->off = pg->off & PAGE_MASK;
1497                 }
1498         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1499                 for (i = 0; i < page_count; i++) {
1500                         struct brw_page *pg = pga[i];
1501                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1502
1503                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1504                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1505                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1506                         /* count/off are forced to cover the whole encryption
1507                          * unit size so that all encrypted data is stored on the
1508                          * OST, so adjust bp_{count,off}_diff for the size of
1509                          * the clear text.
1510                          */
1511                         pg->bp_count_diff = nunits - pg->count;
1512                         pg->count = nunits;
1513                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1514                         pg->off = pg->off & PAGE_MASK;
1515                 }
1516         }
1517
1518         for (niocount = i = 1; i < page_count; i++) {
1519                 if (!can_merge_pages(pga[i - 1], pga[i]))
1520                         niocount++;
1521         }
1522
1523         pill = &req->rq_pill;
1524         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1525                              sizeof(*ioobj));
1526         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1527                              niocount * sizeof(*niobuf));
1528
1529         for (i = 0; i < page_count; i++) {
1530                 short_io_size += pga[i]->count;
1531                 if (!inode || !IS_ENCRYPTED(inode)) {
1532                         pga[i]->bp_count_diff = 0;
1533                         pga[i]->bp_off_diff = 0;
1534                 }
1535         }
1536
1537         /* Check if read/write is small enough to be a short io. */
1538         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1539             !imp_connect_shortio(cli->cl_import))
1540                 short_io_size = 0;
1541
1542         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1543                              opc == OST_READ ? 0 : short_io_size);
1544         if (opc == OST_READ)
1545                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1546                                      short_io_size);
1547
1548         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1549         if (rc) {
1550                 ptlrpc_request_free(req);
1551                 RETURN(rc);
1552         }
1553         osc_set_io_portal(req);
1554
1555         ptlrpc_at_set_req_timeout(req);
1556         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1557          * retry logic */
1558         req->rq_no_retry_einprogress = 1;
1559
1560         if (short_io_size != 0) {
1561                 desc = NULL;
1562                 short_io_buf = NULL;
1563                 goto no_bulk;
1564         }
1565
1566         desc = ptlrpc_prep_bulk_imp(req, page_count,
1567                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1568                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1569                         PTLRPC_BULK_PUT_SINK),
1570                 OST_BULK_PORTAL,
1571                 &ptlrpc_bulk_kiov_pin_ops);
1572
1573         if (desc == NULL)
1574                 GOTO(out, rc = -ENOMEM);
1575         /* NB request now owns desc and will free it when it gets freed */
1576 no_bulk:
1577         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1578         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1579         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1580         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1581
1582         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1583
1584         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1585          * and from_kgid(), because they are asynchronous. Fortunately, variable
1586          * oa contains valid o_uid and o_gid in these two operations.
1587          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1588          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1589          * other process logic */
1590         body->oa.o_uid = oa->o_uid;
1591         body->oa.o_gid = oa->o_gid;
1592
1593         obdo_to_ioobj(oa, ioobj);
1594         ioobj->ioo_bufcnt = niocount;
1595         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1596          * that might be send for this request.  The actual number is decided
1597          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1598          * "max - 1" for old client compatibility sending "0", and also so the
1599          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1600         if (desc != NULL)
1601                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1602         else /* short io */
1603                 ioobj_max_brw_set(ioobj, 0);
1604
1605         if (short_io_size != 0) {
1606                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1607                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1608                         body->oa.o_flags = 0;
1609                 }
1610                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1611                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1612                        short_io_size);
1613                 if (opc == OST_WRITE) {
1614                         short_io_buf = req_capsule_client_get(pill,
1615                                                               &RMF_SHORT_IO);
1616                         LASSERT(short_io_buf != NULL);
1617                 }
1618         }
1619
1620         LASSERT(page_count > 0);
1621         pg_prev = pga[0];
1622         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1623                 struct brw_page *pg = pga[i];
1624                 int poff = pg->off & ~PAGE_MASK;
1625
1626                 LASSERT(pg->count > 0);
1627                 /* make sure there is no gap in the middle of page array */
1628                 LASSERTF(page_count == 1 ||
1629                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1630                           ergo(i > 0 && i < page_count - 1,
1631                                poff == 0 && pg->count == PAGE_SIZE)   &&
1632                           ergo(i == page_count - 1, poff == 0)),
1633                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1634                          i, page_count, pg, pg->off, pg->count);
1635                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1636                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1637                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1638                          i, page_count,
1639                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1640                          pg_prev->pg, page_private(pg_prev->pg),
1641                          pg_prev->pg->index, pg_prev->off);
1642                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1643                         (pg->flag & OBD_BRW_SRVLOCK));
1644                 if (short_io_size != 0 && opc == OST_WRITE) {
1645                         unsigned char *ptr = kmap_atomic(pg->pg);
1646
1647                         LASSERT(short_io_size >= requested_nob + pg->count);
1648                         memcpy(short_io_buf + requested_nob,
1649                                ptr + poff,
1650                                pg->count);
1651                         kunmap_atomic(ptr);
1652                 } else if (short_io_size == 0) {
1653                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1654                                                          pg->count);
1655                 }
1656                 requested_nob += pg->count;
1657
1658                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1659                         niobuf--;
1660                         niobuf->rnb_len += pg->count;
1661                 } else {
1662                         niobuf->rnb_offset = pg->off;
1663                         niobuf->rnb_len    = pg->count;
1664                         niobuf->rnb_flags  = pg->flag;
1665                 }
1666                 pg_prev = pg;
1667         }
1668
1669         LASSERTF((void *)(niobuf - niocount) ==
1670                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1671                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1672                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1673
1674         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1675         if (resend) {
1676                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1677                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1678                         body->oa.o_flags = 0;
1679                 }
1680                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1681         }
1682
1683         if (osc_should_shrink_grant(cli))
1684                 osc_shrink_grant_local(cli, &body->oa);
1685
1686         /* size[REQ_REC_OFF] still sizeof (*body) */
1687         if (opc == OST_WRITE) {
1688                 if (cli->cl_checksum &&
1689                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1690                         /* store cl_cksum_type in a local variable since
1691                          * it can be changed via lprocfs */
1692                         enum cksum_types cksum_type = cli->cl_cksum_type;
1693
1694                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1695                                 body->oa.o_flags = 0;
1696
1697                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1698                                                                 cksum_type);
1699                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1700
1701                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1702                                                   requested_nob, page_count,
1703                                                   pga, OST_WRITE,
1704                                                   &body->oa.o_cksum);
1705                         if (rc < 0) {
1706                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1707                                        rc);
1708                                 GOTO(out, rc);
1709                         }
1710                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1711                                body->oa.o_cksum);
1712
1713                         /* save this in 'oa', too, for later checking */
1714                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1715                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1716                                                            cksum_type);
1717                 } else {
1718                         /* clear out the checksum flag, in case this is a
1719                          * resend but cl_checksum is no longer set. b=11238 */
1720                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1721                 }
1722                 oa->o_cksum = body->oa.o_cksum;
1723                 /* 1 RC per niobuf */
1724                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1725                                      sizeof(__u32) * niocount);
1726         } else {
1727                 if (cli->cl_checksum &&
1728                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1729                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1730                                 body->oa.o_flags = 0;
1731                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1732                                 cli->cl_cksum_type);
1733                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1734                 }
1735
1736                 /* Client cksum has been already copied to wire obdo in previous
1737                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1738                  * resent due to cksum error, this will allow Server to
1739                  * check+dump pages on its side */
1740         }
1741         ptlrpc_request_set_replen(req);
1742
1743         aa = ptlrpc_req_async_args(aa, req);
1744         aa->aa_oa = oa;
1745         aa->aa_requested_nob = requested_nob;
1746         aa->aa_nio_count = niocount;
1747         aa->aa_page_count = page_count;
1748         aa->aa_resends = 0;
1749         aa->aa_ppga = pga;
1750         aa->aa_cli = cli;
1751         INIT_LIST_HEAD(&aa->aa_oaps);
1752
1753         *reqp = req;
1754         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1755         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1756                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1757                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1758         RETURN(0);
1759
1760  out:
1761         ptlrpc_req_finished(req);
1762         RETURN(rc);
1763 }
1764
1765 char dbgcksum_file_name[PATH_MAX];
1766
1767 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1768                                 struct brw_page **pga, __u32 server_cksum,
1769                                 __u32 client_cksum)
1770 {
1771         struct file *filp;
1772         int rc, i;
1773         unsigned int len;
1774         char *buf;
1775
1776         /* will only keep dump of pages on first error for the same range in
1777          * file/fid, not during the resends/retries. */
1778         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1779                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1780                  (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1781                   libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1782                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1783                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1784                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1785                  pga[0]->off,
1786                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1787                  client_cksum, server_cksum);
1788         filp = filp_open(dbgcksum_file_name,
1789                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1790         if (IS_ERR(filp)) {
1791                 rc = PTR_ERR(filp);
1792                 if (rc == -EEXIST)
1793                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1794                                "checksum error: rc = %d\n", dbgcksum_file_name,
1795                                rc);
1796                 else
1797                         CERROR("%s: can't open to dump pages with checksum "
1798                                "error: rc = %d\n", dbgcksum_file_name, rc);
1799                 return;
1800         }
1801
1802         for (i = 0; i < page_count; i++) {
1803                 len = pga[i]->count;
1804                 buf = kmap(pga[i]->pg);
1805                 while (len != 0) {
1806                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1807                         if (rc < 0) {
1808                                 CERROR("%s: wanted to write %u but got %d "
1809                                        "error\n", dbgcksum_file_name, len, rc);
1810                                 break;
1811                         }
1812                         len -= rc;
1813                         buf += rc;
1814                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1815                                dbgcksum_file_name, rc);
1816                 }
1817                 kunmap(pga[i]->pg);
1818         }
1819
1820         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1821         if (rc)
1822                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1823         filp_close(filp, NULL);
1824 }
1825
1826 static int
1827 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1828                      __u32 client_cksum, __u32 server_cksum,
1829                      struct osc_brw_async_args *aa)
1830 {
1831         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1832         enum cksum_types cksum_type;
1833         obd_dif_csum_fn *fn = NULL;
1834         int sector_size = 0;
1835         __u32 new_cksum;
1836         char *msg;
1837         int rc;
1838
1839         if (server_cksum == client_cksum) {
1840                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1841                 return 0;
1842         }
1843
1844         if (aa->aa_cli->cl_checksum_dump)
1845                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1846                                     server_cksum, client_cksum);
1847
1848         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1849                                            oa->o_flags : 0);
1850
1851         switch (cksum_type) {
1852         case OBD_CKSUM_T10IP512:
1853                 fn = obd_dif_ip_fn;
1854                 sector_size = 512;
1855                 break;
1856         case OBD_CKSUM_T10IP4K:
1857                 fn = obd_dif_ip_fn;
1858                 sector_size = 4096;
1859                 break;
1860         case OBD_CKSUM_T10CRC512:
1861                 fn = obd_dif_crc_fn;
1862                 sector_size = 512;
1863                 break;
1864         case OBD_CKSUM_T10CRC4K:
1865                 fn = obd_dif_crc_fn;
1866                 sector_size = 4096;
1867                 break;
1868         default:
1869                 break;
1870         }
1871
1872         if (fn)
1873                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1874                                              aa->aa_page_count, aa->aa_ppga,
1875                                              OST_WRITE, fn, sector_size,
1876                                              &new_cksum);
1877         else
1878                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1879                                        aa->aa_ppga, OST_WRITE, cksum_type,
1880                                        &new_cksum);
1881
1882         if (rc < 0)
1883                 msg = "failed to calculate the client write checksum";
1884         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1885                 msg = "the server did not use the checksum type specified in "
1886                       "the original request - likely a protocol problem";
1887         else if (new_cksum == server_cksum)
1888                 msg = "changed on the client after we checksummed it - "
1889                       "likely false positive due to mmap IO (bug 11742)";
1890         else if (new_cksum == client_cksum)
1891                 msg = "changed in transit before arrival at OST";
1892         else
1893                 msg = "changed in transit AND doesn't match the original - "
1894                       "likely false positive due to mmap IO (bug 11742)";
1895
1896         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1897                            DFID " object "DOSTID" extent [%llu-%llu], original "
1898                            "client csum %x (type %x), server csum %x (type %x),"
1899                            " client csum now %x\n",
1900                            obd_name, msg, libcfs_nid2str(peer->nid),
1901                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1902                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1903                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1904                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1905                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1906                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1907                            client_cksum,
1908                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1909                            server_cksum, cksum_type, new_cksum);
1910         return 1;
1911 }
1912
1913 /* Note rc enters this function as number of bytes transferred */
1914 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1915 {
1916         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1917         struct client_obd *cli = aa->aa_cli;
1918         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1919         const struct lnet_process_id *peer =
1920                 &req->rq_import->imp_connection->c_peer;
1921         struct ost_body *body;
1922         u32 client_cksum = 0;
1923         struct inode *inode;
1924         unsigned int blockbits = 0, blocksize = 0;
1925
1926         ENTRY;
1927
1928         if (rc < 0 && rc != -EDQUOT) {
1929                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1930                 RETURN(rc);
1931         }
1932
1933         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1934         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1935         if (body == NULL) {
1936                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1937                 RETURN(-EPROTO);
1938         }
1939
1940         /* set/clear over quota flag for a uid/gid/projid */
1941         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1942             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1943                 unsigned qid[LL_MAXQUOTAS] = {
1944                                          body->oa.o_uid, body->oa.o_gid,
1945                                          body->oa.o_projid };
1946                 CDEBUG(D_QUOTA,
1947                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1948                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1949                        body->oa.o_valid, body->oa.o_flags);
1950                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1951                                        body->oa.o_flags);
1952         }
1953
1954         osc_update_grant(cli, body);
1955
1956         if (rc < 0)
1957                 RETURN(rc);
1958
1959         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1960                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1961
1962         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1963                 if (rc > 0) {
1964                         CERROR("%s: unexpected positive size %d\n",
1965                                obd_name, rc);
1966                         RETURN(-EPROTO);
1967                 }
1968
1969                 if (req->rq_bulk != NULL &&
1970                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1971                         RETURN(-EAGAIN);
1972
1973                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1974                     check_write_checksum(&body->oa, peer, client_cksum,
1975                                          body->oa.o_cksum, aa))
1976                         RETURN(-EAGAIN);
1977
1978                 rc = check_write_rcs(req, aa->aa_requested_nob,
1979                                      aa->aa_nio_count, aa->aa_page_count,
1980                                      aa->aa_ppga);
1981                 GOTO(out, rc);
1982         }
1983
1984         /* The rest of this function executes only for OST_READs */
1985
1986         if (req->rq_bulk == NULL) {
1987                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1988                                           RCL_SERVER);
1989                 LASSERT(rc == req->rq_status);
1990         } else {
1991                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1992                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1993         }
1994         if (rc < 0)
1995                 GOTO(out, rc = -EAGAIN);
1996
1997         if (rc > aa->aa_requested_nob) {
1998                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1999                        rc, aa->aa_requested_nob);
2000                 RETURN(-EPROTO);
2001         }
2002
2003         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2004                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2005                        rc, req->rq_bulk->bd_nob_transferred);
2006                 RETURN(-EPROTO);
2007         }
2008
2009         if (req->rq_bulk == NULL) {
2010                 /* short io */
2011                 int nob, pg_count, i = 0;
2012                 unsigned char *buf;
2013
2014                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2015                 pg_count = aa->aa_page_count;
2016                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2017                                                    rc);
2018                 nob = rc;
2019                 while (nob > 0 && pg_count > 0) {
2020                         unsigned char *ptr;
2021                         int count = aa->aa_ppga[i]->count > nob ?
2022                                     nob : aa->aa_ppga[i]->count;
2023
2024                         CDEBUG(D_CACHE, "page %p count %d\n",
2025                                aa->aa_ppga[i]->pg, count);
2026                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2027                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2028                                count);
2029                         kunmap_atomic((void *) ptr);
2030
2031                         buf += count;
2032                         nob -= count;
2033                         i++;
2034                         pg_count--;
2035                 }
2036         }
2037
2038         if (rc < aa->aa_requested_nob)
2039                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2040
2041         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2042                 static int cksum_counter;
2043                 u32        server_cksum = body->oa.o_cksum;
2044                 char      *via = "";
2045                 char      *router = "";
2046                 enum cksum_types cksum_type;
2047                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2048                         body->oa.o_flags : 0;
2049
2050                 cksum_type = obd_cksum_type_unpack(o_flags);
2051                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2052                                           aa->aa_page_count, aa->aa_ppga,
2053                                           OST_READ, &client_cksum);
2054                 if (rc < 0)
2055                         GOTO(out, rc);
2056
2057                 if (req->rq_bulk != NULL &&
2058                     peer->nid != req->rq_bulk->bd_sender) {
2059                         via = " via ";
2060                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2061                 }
2062
2063                 if (server_cksum != client_cksum) {
2064                         struct ost_body *clbody;
2065                         u32 page_count = aa->aa_page_count;
2066
2067                         clbody = req_capsule_client_get(&req->rq_pill,
2068                                                         &RMF_OST_BODY);
2069                         if (cli->cl_checksum_dump)
2070                                 dump_all_bulk_pages(&clbody->oa, page_count,
2071                                                     aa->aa_ppga, server_cksum,
2072                                                     client_cksum);
2073
2074                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2075                                            "%s%s%s inode "DFID" object "DOSTID
2076                                            " extent [%llu-%llu], client %x, "
2077                                            "server %x, cksum_type %x\n",
2078                                            obd_name,
2079                                            libcfs_nid2str(peer->nid),
2080                                            via, router,
2081                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2082                                                 clbody->oa.o_parent_seq : 0ULL,
2083                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2084                                                 clbody->oa.o_parent_oid : 0,
2085                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2086                                                 clbody->oa.o_parent_ver : 0,
2087                                            POSTID(&body->oa.o_oi),
2088                                            aa->aa_ppga[0]->off,
2089                                            aa->aa_ppga[page_count-1]->off +
2090                                            aa->aa_ppga[page_count-1]->count - 1,
2091                                            client_cksum, server_cksum,
2092                                            cksum_type);
2093                         cksum_counter = 0;
2094                         aa->aa_oa->o_cksum = client_cksum;
2095                         rc = -EAGAIN;
2096                 } else {
2097                         cksum_counter++;
2098                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2099                         rc = 0;
2100                 }
2101         } else if (unlikely(client_cksum)) {
2102                 static int cksum_missed;
2103
2104                 cksum_missed++;
2105                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2106                         CERROR("%s: checksum %u requested from %s but not sent\n",
2107                                obd_name, cksum_missed,
2108                                libcfs_nid2str(peer->nid));
2109         } else {
2110                 rc = 0;
2111         }
2112
2113         inode = page2inode(aa->aa_ppga[0]->pg);
2114         if (inode == NULL) {
2115                 /* Try to get reference to inode from cl_page if we are
2116                  * dealing with direct IO, as handled pages are not
2117                  * actual page cache pages.
2118                  */
2119                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2120
2121                 inode = oap2cl_page(oap)->cp_inode;
2122                 if (inode) {
2123                         blockbits = inode->i_blkbits;
2124                         blocksize = 1 << blockbits;
2125                 }
2126         }
2127         if (inode && IS_ENCRYPTED(inode)) {
2128                 int idx;
2129
2130                 if (!llcrypt_has_encryption_key(inode)) {
2131                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2132                         GOTO(out, rc);
2133                 }
2134                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2135                         struct brw_page *pg = aa->aa_ppga[idx];
2136                         unsigned int offs = 0;
2137
2138                         while (offs < PAGE_SIZE) {
2139                                 /* do not decrypt if page is all 0s */
2140                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2141                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2142                                         /* if page is empty forward info to
2143                                          * upper layers (ll_io_zero_page) by
2144                                          * clearing PagePrivate2
2145                                          */
2146                                         if (!offs)
2147                                                 ClearPagePrivate2(pg->pg);
2148                                         break;
2149                                 }
2150
2151                                 if (blockbits) {
2152                                         /* This is direct IO case. Directly call
2153                                          * decrypt function that takes inode as
2154                                          * input parameter. Page does not need
2155                                          * to be locked.
2156                                          */
2157                                         u64 lblk_num =
2158                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2159                                                      (PAGE_SHIFT - blockbits)) +
2160                                                        (offs >> blockbits);
2161                                         unsigned int i;
2162
2163                                         for (i = offs;
2164                                              i < offs +
2165                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2166                                              i += blocksize, lblk_num++) {
2167                                                 rc =
2168                                                   llcrypt_decrypt_block_inplace(
2169                                                           inode, pg->pg,
2170                                                           blocksize, i,
2171                                                           lblk_num);
2172                                                 if (rc)
2173                                                         break;
2174                                         }
2175                                 } else {
2176                                         rc = llcrypt_decrypt_pagecache_blocks(
2177                                                 pg->pg,
2178                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2179                                                 offs);
2180                                 }
2181                                 if (rc)
2182                                         GOTO(out, rc);
2183
2184                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2185                         }
2186                 }
2187         }
2188
2189 out:
2190         if (rc >= 0)
2191                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2192                                      aa->aa_oa, &body->oa);
2193
2194         RETURN(rc);
2195 }
2196
2197 static int osc_brw_redo_request(struct ptlrpc_request *request,
2198                                 struct osc_brw_async_args *aa, int rc)
2199 {
2200         struct ptlrpc_request *new_req;
2201         struct osc_brw_async_args *new_aa;
2202         struct osc_async_page *oap;
2203         ENTRY;
2204
2205         /* The below message is checked in replay-ost-single.sh test_8ae*/
2206         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2207                   "redo for recoverable error %d", rc);
2208
2209         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2210                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2211                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2212                                   aa->aa_ppga, &new_req, 1);
2213         if (rc)
2214                 RETURN(rc);
2215
2216         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2217                 if (oap->oap_request != NULL) {
2218                         LASSERTF(request == oap->oap_request,
2219                                  "request %p != oap_request %p\n",
2220                                  request, oap->oap_request);
2221                 }
2222         }
2223         /*
2224          * New request takes over pga and oaps from old request.
2225          * Note that copying a list_head doesn't work, need to move it...
2226          */
2227         aa->aa_resends++;
2228         new_req->rq_interpret_reply = request->rq_interpret_reply;
2229         new_req->rq_async_args = request->rq_async_args;
2230         new_req->rq_commit_cb = request->rq_commit_cb;
2231         /* cap resend delay to the current request timeout, this is similar to
2232          * what ptlrpc does (see after_reply()) */
2233         if (aa->aa_resends > new_req->rq_timeout)
2234                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2235         else
2236                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2237         new_req->rq_generation_set = 1;
2238         new_req->rq_import_generation = request->rq_import_generation;
2239
2240         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2241
2242         INIT_LIST_HEAD(&new_aa->aa_oaps);
2243         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2244         INIT_LIST_HEAD(&new_aa->aa_exts);
2245         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2246         new_aa->aa_resends = aa->aa_resends;
2247
2248         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2249                 if (oap->oap_request) {
2250                         ptlrpc_req_finished(oap->oap_request);
2251                         oap->oap_request = ptlrpc_request_addref(new_req);
2252                 }
2253         }
2254
2255         /* XXX: This code will run into problem if we're going to support
2256          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2257          * and wait for all of them to be finished. We should inherit request
2258          * set from old request. */
2259         ptlrpcd_add_req(new_req);
2260
2261         DEBUG_REQ(D_INFO, new_req, "new request");
2262         RETURN(0);
2263 }
2264
2265 /*
2266  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2267  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2268  * fine for our small page arrays and doesn't require allocation.  its an
2269  * insertion sort that swaps elements that are strides apart, shrinking the
2270  * stride down until its '1' and the array is sorted.
2271  */
2272 static void sort_brw_pages(struct brw_page **array, int num)
2273 {
2274         int stride, i, j;
2275         struct brw_page *tmp;
2276
2277         if (num == 1)
2278                 return;
2279         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2280                 ;
2281
2282         do {
2283                 stride /= 3;
2284                 for (i = stride ; i < num ; i++) {
2285                         tmp = array[i];
2286                         j = i;
2287                         while (j >= stride && array[j - stride]->off > tmp->off) {
2288                                 array[j] = array[j - stride];
2289                                 j -= stride;
2290                         }
2291                         array[j] = tmp;
2292                 }
2293         } while (stride > 1);
2294 }
2295
2296 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2297 {
2298         LASSERT(ppga != NULL);
2299         OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2300 }
2301
2302 static int brw_interpret(const struct lu_env *env,
2303                          struct ptlrpc_request *req, void *args, int rc)
2304 {
2305         struct osc_brw_async_args *aa = args;
2306         struct osc_extent *ext;
2307         struct osc_extent *tmp;
2308         struct client_obd *cli = aa->aa_cli;
2309         unsigned long transferred = 0;
2310
2311         ENTRY;
2312
2313         rc = osc_brw_fini_request(req, rc);
2314         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2315
2316         /* restore clear text pages */
2317         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2318
2319         /*
2320          * When server returns -EINPROGRESS, client should always retry
2321          * regardless of the number of times the bulk was resent already.
2322          */
2323         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2324                 if (req->rq_import_generation !=
2325                     req->rq_import->imp_generation) {
2326                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2327                                ""DOSTID", rc = %d.\n",
2328                                req->rq_import->imp_obd->obd_name,
2329                                POSTID(&aa->aa_oa->o_oi), rc);
2330                 } else if (rc == -EINPROGRESS ||
2331                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2332                         rc = osc_brw_redo_request(req, aa, rc);
2333                 } else {
2334                         CERROR("%s: too many resent retries for object: "
2335                                "%llu:%llu, rc = %d.\n",
2336                                req->rq_import->imp_obd->obd_name,
2337                                POSTID(&aa->aa_oa->o_oi), rc);
2338                 }
2339
2340                 if (rc == 0)
2341                         RETURN(0);
2342                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2343                         rc = -EIO;
2344         }
2345
2346         if (rc == 0) {
2347                 struct obdo *oa = aa->aa_oa;
2348                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2349                 unsigned long valid = 0;
2350                 struct cl_object *obj;
2351                 struct osc_async_page *last;
2352
2353                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2354                 obj = osc2cl(last->oap_obj);
2355
2356                 cl_object_attr_lock(obj);
2357                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2358                         attr->cat_blocks = oa->o_blocks;
2359                         valid |= CAT_BLOCKS;
2360                 }
2361                 if (oa->o_valid & OBD_MD_FLMTIME) {
2362                         attr->cat_mtime = oa->o_mtime;
2363                         valid |= CAT_MTIME;
2364                 }
2365                 if (oa->o_valid & OBD_MD_FLATIME) {
2366                         attr->cat_atime = oa->o_atime;
2367                         valid |= CAT_ATIME;
2368                 }
2369                 if (oa->o_valid & OBD_MD_FLCTIME) {
2370                         attr->cat_ctime = oa->o_ctime;
2371                         valid |= CAT_CTIME;
2372                 }
2373
2374                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2375                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2376                         loff_t last_off = last->oap_count + last->oap_obj_off +
2377                                 last->oap_page_off;
2378
2379                         /* Change file size if this is an out of quota or
2380                          * direct IO write and it extends the file size */
2381                         if (loi->loi_lvb.lvb_size < last_off) {
2382                                 attr->cat_size = last_off;
2383                                 valid |= CAT_SIZE;
2384                         }
2385                         /* Extend KMS if it's not a lockless write */
2386                         if (loi->loi_kms < last_off &&
2387                             oap2osc_page(last)->ops_srvlock == 0) {
2388                                 attr->cat_kms = last_off;
2389                                 valid |= CAT_KMS;
2390                         }
2391                 }
2392
2393                 if (valid != 0)
2394                         cl_object_attr_update(env, obj, attr, valid);
2395                 cl_object_attr_unlock(obj);
2396         }
2397         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2398         aa->aa_oa = NULL;
2399
2400         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2401                 osc_inc_unstable_pages(req);
2402
2403         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2404                 list_del_init(&ext->oe_link);
2405                 osc_extent_finish(env, ext, 1,
2406                                   rc && req->rq_no_delay ? -EAGAIN : rc);
2407         }
2408         LASSERT(list_empty(&aa->aa_exts));
2409         LASSERT(list_empty(&aa->aa_oaps));
2410
2411         transferred = (req->rq_bulk == NULL ? /* short io */
2412                        aa->aa_requested_nob :
2413                        req->rq_bulk->bd_nob_transferred);
2414
2415         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2416         ptlrpc_lprocfs_brw(req, transferred);
2417
2418         spin_lock(&cli->cl_loi_list_lock);
2419         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2420          * is called so we know whether to go to sync BRWs or wait for more
2421          * RPCs to complete */
2422         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2423                 cli->cl_w_in_flight--;
2424         else
2425                 cli->cl_r_in_flight--;
2426         osc_wake_cache_waiters(cli);
2427         spin_unlock(&cli->cl_loi_list_lock);
2428
2429         osc_io_unplug(env, cli, NULL);
2430         RETURN(rc);
2431 }
2432
2433 static void brw_commit(struct ptlrpc_request *req)
2434 {
2435         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2436          * this called via the rq_commit_cb, I need to ensure
2437          * osc_dec_unstable_pages is still called. Otherwise unstable
2438          * pages may be leaked. */
2439         spin_lock(&req->rq_lock);
2440         if (likely(req->rq_unstable)) {
2441                 req->rq_unstable = 0;
2442                 spin_unlock(&req->rq_lock);
2443
2444                 osc_dec_unstable_pages(req);
2445         } else {
2446                 req->rq_committed = 1;
2447                 spin_unlock(&req->rq_lock);
2448         }
2449 }
2450
2451 /**
2452  * Build an RPC by the list of extent @ext_list. The caller must ensure
2453  * that the total pages in this list are NOT over max pages per RPC.
2454  * Extents in the list must be in OES_RPC state.
2455  */
2456 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2457                   struct list_head *ext_list, int cmd)
2458 {
2459         struct ptlrpc_request           *req = NULL;
2460         struct osc_extent               *ext;
2461         struct brw_page                 **pga = NULL;
2462         struct osc_brw_async_args       *aa = NULL;
2463         struct obdo                     *oa = NULL;
2464         struct osc_async_page           *oap;
2465         struct osc_object               *obj = NULL;
2466         struct cl_req_attr              *crattr = NULL;
2467         loff_t                          starting_offset = OBD_OBJECT_EOF;
2468         loff_t                          ending_offset = 0;
2469         /* '1' for consistency with code that checks !mpflag to restore */
2470         int mpflag = 1;
2471         int                             mem_tight = 0;
2472         int                             page_count = 0;
2473         bool                            soft_sync = false;
2474         bool                            ndelay = false;
2475         int                             i;
2476         int                             grant = 0;
2477         int                             rc;
2478         __u32                           layout_version = 0;
2479         LIST_HEAD(rpc_list);
2480         struct ost_body                 *body;
2481         ENTRY;
2482         LASSERT(!list_empty(ext_list));
2483
2484         /* add pages into rpc_list to build BRW rpc */
2485         list_for_each_entry(ext, ext_list, oe_link) {
2486                 LASSERT(ext->oe_state == OES_RPC);
2487                 mem_tight |= ext->oe_memalloc;
2488                 grant += ext->oe_grants;
2489                 page_count += ext->oe_nr_pages;
2490                 layout_version = max(layout_version, ext->oe_layout_version);
2491                 if (obj == NULL)
2492                         obj = ext->oe_obj;
2493         }
2494
2495         soft_sync = osc_over_unstable_soft_limit(cli);
2496         if (mem_tight)
2497                 mpflag = memalloc_noreclaim_save();
2498
2499         OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2500         if (pga == NULL)
2501                 GOTO(out, rc = -ENOMEM);
2502
2503         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2504         if (oa == NULL)
2505                 GOTO(out, rc = -ENOMEM);
2506
2507         i = 0;
2508         list_for_each_entry(ext, ext_list, oe_link) {
2509                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2510                         if (mem_tight)
2511                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2512                         if (soft_sync)
2513                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2514                         pga[i] = &oap->oap_brw_page;
2515                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2516                         i++;
2517
2518                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2519                         if (starting_offset == OBD_OBJECT_EOF ||
2520                             starting_offset > oap->oap_obj_off)
2521                                 starting_offset = oap->oap_obj_off;
2522                         else
2523                                 LASSERT(oap->oap_page_off == 0);
2524                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2525                                 ending_offset = oap->oap_obj_off +
2526                                                 oap->oap_count;
2527                         else
2528                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2529                                         PAGE_SIZE);
2530                 }
2531                 if (ext->oe_ndelay)
2532                         ndelay = true;
2533         }
2534
2535         /* first page in the list */
2536         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2537
2538         crattr = &osc_env_info(env)->oti_req_attr;
2539         memset(crattr, 0, sizeof(*crattr));
2540         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2541         crattr->cra_flags = ~0ULL;
2542         crattr->cra_page = oap2cl_page(oap);
2543         crattr->cra_oa = oa;
2544         cl_req_attr_set(env, osc2cl(obj), crattr);
2545
2546         if (cmd == OBD_BRW_WRITE) {
2547                 oa->o_grant_used = grant;
2548                 if (layout_version > 0) {
2549                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2550                                PFID(&oa->o_oi.oi_fid), layout_version);
2551
2552                         oa->o_layout_version = layout_version;
2553                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2554                 }
2555         }
2556
2557         sort_brw_pages(pga, page_count);
2558         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2559         if (rc != 0) {
2560                 CERROR("prep_req failed: %d\n", rc);
2561                 GOTO(out, rc);
2562         }
2563
2564         req->rq_commit_cb = brw_commit;
2565         req->rq_interpret_reply = brw_interpret;
2566         req->rq_memalloc = mem_tight != 0;
2567         oap->oap_request = ptlrpc_request_addref(req);
2568         if (ndelay) {
2569                 req->rq_no_resend = req->rq_no_delay = 1;
2570                 /* probably set a shorter timeout value.
2571                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2572                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2573         }
2574
2575         /* Need to update the timestamps after the request is built in case
2576          * we race with setattr (locally or in queue at OST).  If OST gets
2577          * later setattr before earlier BRW (as determined by the request xid),
2578          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2579          * way to do this in a single call.  bug 10150 */
2580         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2581         crattr->cra_oa = &body->oa;
2582         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2583         cl_req_attr_set(env, osc2cl(obj), crattr);
2584         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2585
2586         aa = ptlrpc_req_async_args(aa, req);
2587         INIT_LIST_HEAD(&aa->aa_oaps);
2588         list_splice_init(&rpc_list, &aa->aa_oaps);
2589         INIT_LIST_HEAD(&aa->aa_exts);
2590         list_splice_init(ext_list, &aa->aa_exts);
2591
2592         spin_lock(&cli->cl_loi_list_lock);
2593         starting_offset >>= PAGE_SHIFT;
2594         if (cmd == OBD_BRW_READ) {
2595                 cli->cl_r_in_flight++;
2596                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2597                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2598                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2599                                       starting_offset + 1);
2600         } else {
2601                 cli->cl_w_in_flight++;
2602                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2603                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2604                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2605                                       starting_offset + 1);
2606         }
2607         spin_unlock(&cli->cl_loi_list_lock);
2608
2609         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2610                   page_count, aa, cli->cl_r_in_flight,
2611                   cli->cl_w_in_flight);
2612         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2613
2614         ptlrpcd_add_req(req);
2615         rc = 0;
2616         EXIT;
2617
2618 out:
2619         if (mem_tight)
2620                 memalloc_noreclaim_restore(mpflag);
2621
2622         if (rc != 0) {
2623                 LASSERT(req == NULL);
2624
2625                 if (oa)
2626                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2627                 if (pga) {
2628                         osc_release_bounce_pages(pga, page_count);
2629                         osc_release_ppga(pga, page_count);
2630                 }
2631                 /* this should happen rarely and is pretty bad, it makes the
2632                  * pending list not follow the dirty order */
2633                 while (!list_empty(ext_list)) {
2634                         ext = list_entry(ext_list->next, struct osc_extent,
2635                                          oe_link);
2636                         list_del_init(&ext->oe_link);
2637                         osc_extent_finish(env, ext, 0, rc);
2638                 }
2639         }
2640         RETURN(rc);
2641 }
2642
2643 /* This is to refresh our lock in face of no RPCs. */
2644 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2645 {
2646         struct ptlrpc_request *req;
2647         struct obdo oa;
2648         struct brw_page bpg = { .off = start, .count = 1};
2649         struct brw_page *pga = &bpg;
2650         int rc;
2651
2652         memset(&oa, 0, sizeof(oa));
2653         oa.o_oi = osc->oo_oinfo->loi_oi;
2654         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2655         /* For updated servers - don't do a read */
2656         oa.o_flags = OBD_FL_NORPC;
2657
2658         rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2659                                   &req, 0);
2660
2661         /* If we succeeded we ship it off, if not there's no point in doing
2662          * anything. Also no resends.
2663          * No interpret callback, no commit callback.
2664          */
2665         if (!rc) {
2666                 req->rq_no_resend = 1;
2667                 ptlrpcd_add_req(req);
2668         }
2669 }
2670
2671 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2672 {
2673         int set = 0;
2674
2675         LASSERT(lock != NULL);
2676
2677         lock_res_and_lock(lock);
2678
2679         if (lock->l_ast_data == NULL)
2680                 lock->l_ast_data = data;
2681         if (lock->l_ast_data == data)
2682                 set = 1;
2683
2684         unlock_res_and_lock(lock);
2685
2686         return set;
2687 }
2688
2689 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2690                      void *cookie, struct lustre_handle *lockh,
2691                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2692                      int errcode)
2693 {
2694         bool intent = *flags & LDLM_FL_HAS_INTENT;
2695         int rc;
2696         ENTRY;
2697
2698         /* The request was created before ldlm_cli_enqueue call. */
2699         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2700                 struct ldlm_reply *rep;
2701
2702                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2703                 LASSERT(rep != NULL);
2704
2705                 rep->lock_policy_res1 =
2706                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2707                 if (rep->lock_policy_res1)
2708                         errcode = rep->lock_policy_res1;
2709                 if (!speculative)
2710                         *flags |= LDLM_FL_LVB_READY;
2711         } else if (errcode == ELDLM_OK) {
2712                 *flags |= LDLM_FL_LVB_READY;
2713         }
2714
2715         /* Call the update callback. */
2716         rc = (*upcall)(cookie, lockh, errcode);
2717
2718         /* release the reference taken in ldlm_cli_enqueue() */
2719         if (errcode == ELDLM_LOCK_MATCHED)
2720                 errcode = ELDLM_OK;
2721         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2722                 ldlm_lock_decref(lockh, mode);
2723
2724         RETURN(rc);
2725 }
2726
2727 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2728                           void *args, int rc)
2729 {
2730         struct osc_enqueue_args *aa = args;
2731         struct ldlm_lock *lock;
2732         struct lustre_handle *lockh = &aa->oa_lockh;
2733         enum ldlm_mode mode = aa->oa_mode;
2734         struct ost_lvb *lvb = aa->oa_lvb;
2735         __u32 lvb_len = sizeof(*lvb);
2736         __u64 flags = 0;
2737         struct ldlm_enqueue_info einfo = {
2738                 .ei_type = aa->oa_type,
2739                 .ei_mode = mode,
2740         };
2741
2742         ENTRY;
2743
2744         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2745          * be valid. */
2746         lock = ldlm_handle2lock(lockh);
2747         LASSERTF(lock != NULL,
2748                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2749                  lockh->cookie, req, aa);
2750
2751         /* Take an additional reference so that a blocking AST that
2752          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2753          * to arrive after an upcall has been executed by
2754          * osc_enqueue_fini(). */
2755         ldlm_lock_addref(lockh, mode);
2756
2757         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2758         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2759
2760         /* Let CP AST to grant the lock first. */
2761         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2762
2763         if (aa->oa_speculative) {
2764                 LASSERT(aa->oa_lvb == NULL);
2765                 LASSERT(aa->oa_flags == NULL);
2766                 aa->oa_flags = &flags;
2767         }
2768
2769         /* Complete obtaining the lock procedure. */
2770         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2771                                    lvb, lvb_len, lockh, rc);
2772         /* Complete osc stuff. */
2773         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2774                               aa->oa_flags, aa->oa_speculative, rc);
2775
2776         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2777
2778         ldlm_lock_decref(lockh, mode);
2779         LDLM_LOCK_PUT(lock);
2780         RETURN(rc);
2781 }
2782
2783 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2784  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2785  * other synchronous requests, however keeping some locks and trying to obtain
2786  * others may take a considerable amount of time in a case of ost failure; and
2787  * when other sync requests do not get released lock from a client, the client
2788  * is evicted from the cluster -- such scenarious make the life difficult, so
2789  * release locks just after they are obtained. */
2790 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2791                      __u64 *flags, union ldlm_policy_data *policy,
2792                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2793                      void *cookie, struct ldlm_enqueue_info *einfo,
2794                      struct ptlrpc_request_set *rqset, int async,
2795                      bool speculative)
2796 {
2797         struct obd_device *obd = exp->exp_obd;
2798         struct lustre_handle lockh = { 0 };
2799         struct ptlrpc_request *req = NULL;
2800         int intent = *flags & LDLM_FL_HAS_INTENT;
2801         __u64 match_flags = *flags;
2802         enum ldlm_mode mode;
2803         int rc;
2804         ENTRY;
2805
2806         /* Filesystem lock extents are extended to page boundaries so that
2807          * dealing with the page cache is a little smoother.  */
2808         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2809         policy->l_extent.end |= ~PAGE_MASK;
2810
2811         /* Next, search for already existing extent locks that will cover us */
2812         /* If we're trying to read, we also search for an existing PW lock.  The
2813          * VFS and page cache already protect us locally, so lots of readers/
2814          * writers can share a single PW lock.
2815          *
2816          * There are problems with conversion deadlocks, so instead of
2817          * converting a read lock to a write lock, we'll just enqueue a new
2818          * one.
2819          *
2820          * At some point we should cancel the read lock instead of making them
2821          * send us a blocking callback, but there are problems with canceling
2822          * locks out from other users right now, too. */
2823         mode = einfo->ei_mode;
2824         if (einfo->ei_mode == LCK_PR)
2825                 mode |= LCK_PW;
2826         /* Normal lock requests must wait for the LVB to be ready before
2827          * matching a lock; speculative lock requests do not need to,
2828          * because they will not actually use the lock. */
2829         if (!speculative)
2830                 match_flags |= LDLM_FL_LVB_READY;
2831         if (intent != 0)
2832                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2833         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2834                                einfo->ei_type, policy, mode, &lockh);
2835         if (mode) {
2836                 struct ldlm_lock *matched;
2837
2838                 if (*flags & LDLM_FL_TEST_LOCK)
2839                         RETURN(ELDLM_OK);
2840
2841                 matched = ldlm_handle2lock(&lockh);
2842                 if (speculative) {
2843                         /* This DLM lock request is speculative, and does not
2844                          * have an associated IO request. Therefore if there
2845                          * is already a DLM lock, it wll just inform the
2846                          * caller to cancel the request for this stripe.*/
2847                         lock_res_and_lock(matched);
2848                         if (ldlm_extent_equal(&policy->l_extent,
2849                             &matched->l_policy_data.l_extent))
2850                                 rc = -EEXIST;
2851                         else
2852                                 rc = -ECANCELED;
2853                         unlock_res_and_lock(matched);
2854
2855                         ldlm_lock_decref(&lockh, mode);
2856                         LDLM_LOCK_PUT(matched);
2857                         RETURN(rc);
2858                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2859                         *flags |= LDLM_FL_LVB_READY;
2860
2861                         /* We already have a lock, and it's referenced. */
2862                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2863
2864                         ldlm_lock_decref(&lockh, mode);
2865                         LDLM_LOCK_PUT(matched);
2866                         RETURN(ELDLM_OK);
2867                 } else {
2868                         ldlm_lock_decref(&lockh, mode);
2869                         LDLM_LOCK_PUT(matched);
2870                 }
2871         }
2872
2873         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2874                 RETURN(-ENOLCK);
2875
2876         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2877         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2878
2879         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2880                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2881         if (async) {
2882                 if (!rc) {
2883                         struct osc_enqueue_args *aa;
2884                         aa = ptlrpc_req_async_args(aa, req);
2885                         aa->oa_exp         = exp;
2886                         aa->oa_mode        = einfo->ei_mode;
2887                         aa->oa_type        = einfo->ei_type;
2888                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2889                         aa->oa_upcall      = upcall;
2890                         aa->oa_cookie      = cookie;
2891                         aa->oa_speculative = speculative;
2892                         if (!speculative) {
2893                                 aa->oa_flags  = flags;
2894                                 aa->oa_lvb    = lvb;
2895                         } else {
2896                                 /* speculative locks are essentially to enqueue
2897                                  * a DLM lock  in advance, so we don't care
2898                                  * about the result of the enqueue. */
2899                                 aa->oa_lvb    = NULL;
2900                                 aa->oa_flags  = NULL;
2901                         }
2902
2903                         req->rq_interpret_reply = osc_enqueue_interpret;
2904                         ptlrpc_set_add_req(rqset, req);
2905                 }
2906                 RETURN(rc);
2907         }
2908
2909         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2910                               flags, speculative, rc);
2911
2912         RETURN(rc);
2913 }
2914
2915 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2916                    struct ldlm_res_id *res_id, enum ldlm_type type,
2917                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2918                    __u64 *flags, struct osc_object *obj,
2919                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2920 {
2921         struct obd_device *obd = exp->exp_obd;
2922         __u64 lflags = *flags;
2923         enum ldlm_mode rc;
2924         ENTRY;
2925
2926         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2927                 RETURN(-EIO);
2928
2929         /* Filesystem lock extents are extended to page boundaries so that
2930          * dealing with the page cache is a little smoother */
2931         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2932         policy->l_extent.end |= ~PAGE_MASK;
2933
2934         /* Next, search for already existing extent locks that will cover us */
2935         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2936                                         res_id, type, policy, mode, lockh,
2937                                         match_flags);
2938         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2939                 RETURN(rc);
2940
2941         if (obj != NULL) {
2942                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2943
2944                 LASSERT(lock != NULL);
2945                 if (osc_set_lock_data(lock, obj)) {
2946                         lock_res_and_lock(lock);
2947                         if (!ldlm_is_lvb_cached(lock)) {
2948                                 LASSERT(lock->l_ast_data == obj);
2949                                 osc_lock_lvb_update(env, obj, lock, NULL);
2950                                 ldlm_set_lvb_cached(lock);
2951                         }
2952                         unlock_res_and_lock(lock);
2953                 } else {
2954                         ldlm_lock_decref(lockh, rc);
2955                         rc = 0;
2956                 }
2957                 LDLM_LOCK_PUT(lock);
2958         }
2959         RETURN(rc);
2960 }
2961
2962 static int osc_statfs_interpret(const struct lu_env *env,
2963                                 struct ptlrpc_request *req, void *args, int rc)
2964 {
2965         struct osc_async_args *aa = args;
2966         struct obd_statfs *msfs;
2967
2968         ENTRY;
2969         if (rc == -EBADR)
2970                 /*
2971                  * The request has in fact never been sent due to issues at
2972                  * a higher level (LOV).  Exit immediately since the caller
2973                  * is aware of the problem and takes care of the clean up.
2974                  */
2975                 RETURN(rc);
2976
2977         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2978             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2979                 GOTO(out, rc = 0);
2980
2981         if (rc != 0)
2982                 GOTO(out, rc);
2983
2984         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2985         if (msfs == NULL)
2986                 GOTO(out, rc = -EPROTO);
2987
2988         *aa->aa_oi->oi_osfs = *msfs;
2989 out:
2990         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2991
2992         RETURN(rc);
2993 }
2994
2995 static int osc_statfs_async(struct obd_export *exp,
2996                             struct obd_info *oinfo, time64_t max_age,
2997                             struct ptlrpc_request_set *rqset)
2998 {
2999         struct obd_device     *obd = class_exp2obd(exp);
3000         struct ptlrpc_request *req;
3001         struct osc_async_args *aa;
3002         int rc;
3003         ENTRY;
3004
3005         if (obd->obd_osfs_age >= max_age) {
3006                 CDEBUG(D_SUPER,
3007                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3008                        obd->obd_name, &obd->obd_osfs,
3009                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3010                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3011                 spin_lock(&obd->obd_osfs_lock);
3012                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3013                 spin_unlock(&obd->obd_osfs_lock);
3014                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3015                 if (oinfo->oi_cb_up)
3016                         oinfo->oi_cb_up(oinfo, 0);
3017
3018                 RETURN(0);
3019         }
3020
3021         /* We could possibly pass max_age in the request (as an absolute
3022          * timestamp or a "seconds.usec ago") so the target can avoid doing
3023          * extra calls into the filesystem if that isn't necessary (e.g.
3024          * during mount that would help a bit).  Having relative timestamps
3025          * is not so great if request processing is slow, while absolute
3026          * timestamps are not ideal because they need time synchronization. */
3027         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3028         if (req == NULL)
3029                 RETURN(-ENOMEM);
3030
3031         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3032         if (rc) {
3033                 ptlrpc_request_free(req);
3034                 RETURN(rc);
3035         }
3036         ptlrpc_request_set_replen(req);
3037         req->rq_request_portal = OST_CREATE_PORTAL;
3038         ptlrpc_at_set_req_timeout(req);
3039
3040         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3041                 /* procfs requests not want stat in wait for avoid deadlock */
3042                 req->rq_no_resend = 1;
3043                 req->rq_no_delay = 1;
3044         }
3045
3046         req->rq_interpret_reply = osc_statfs_interpret;
3047         aa = ptlrpc_req_async_args(aa, req);
3048         aa->aa_oi = oinfo;
3049
3050         ptlrpc_set_add_req(rqset, req);
3051         RETURN(0);
3052 }
3053
3054 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3055                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3056 {
3057         struct obd_device     *obd = class_exp2obd(exp);
3058         struct obd_statfs     *msfs;
3059         struct ptlrpc_request *req;
3060         struct obd_import     *imp, *imp0;
3061         int rc;
3062         ENTRY;
3063
3064         /*Since the request might also come from lprocfs, so we need
3065          *sync this with client_disconnect_export Bug15684
3066          */
3067         with_imp_locked(obd, imp0, rc)
3068                 imp = class_import_get(imp0);
3069         if (rc)
3070                 RETURN(rc);
3071
3072         /* We could possibly pass max_age in the request (as an absolute
3073          * timestamp or a "seconds.usec ago") so the target can avoid doing
3074          * extra calls into the filesystem if that isn't necessary (e.g.
3075          * during mount that would help a bit).  Having relative timestamps
3076          * is not so great if request processing is slow, while absolute
3077          * timestamps are not ideal because they need time synchronization. */
3078         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3079
3080         class_import_put(imp);
3081
3082         if (req == NULL)
3083                 RETURN(-ENOMEM);
3084
3085         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3086         if (rc) {
3087                 ptlrpc_request_free(req);
3088                 RETURN(rc);
3089         }
3090         ptlrpc_request_set_replen(req);
3091         req->rq_request_portal = OST_CREATE_PORTAL;
3092         ptlrpc_at_set_req_timeout(req);
3093
3094         if (flags & OBD_STATFS_NODELAY) {
3095                 /* procfs requests not want stat in wait for avoid deadlock */
3096                 req->rq_no_resend = 1;
3097                 req->rq_no_delay = 1;
3098         }
3099
3100         rc = ptlrpc_queue_wait(req);
3101         if (rc)
3102                 GOTO(out, rc);
3103
3104         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3105         if (msfs == NULL)
3106                 GOTO(out, rc = -EPROTO);
3107
3108         *osfs = *msfs;
3109
3110         EXIT;
3111 out:
3112         ptlrpc_req_finished(req);
3113         return rc;
3114 }
3115
3116 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3117                          void *karg, void __user *uarg)
3118 {
3119         struct obd_device *obd = exp->exp_obd;
3120         struct obd_ioctl_data *data = karg;
3121         int rc = 0;
3122
3123         ENTRY;
3124         if (!try_module_get(THIS_MODULE)) {
3125                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3126                        module_name(THIS_MODULE));
3127                 return -EINVAL;
3128         }
3129         switch (cmd) {
3130         case OBD_IOC_CLIENT_RECOVER:
3131                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3132                                            data->ioc_inlbuf1, 0);
3133                 if (rc > 0)
3134                         rc = 0;
3135                 break;
3136         case IOC_OSC_SET_ACTIVE:
3137                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3138                                               data->ioc_offset);
3139                 break;
3140         default:
3141                 rc = -ENOTTY;
3142                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3143                        obd->obd_name, cmd, current->comm, rc);
3144                 break;
3145         }
3146
3147         module_put(THIS_MODULE);
3148         return rc;
3149 }
3150
3151 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3152                        u32 keylen, void *key, u32 vallen, void *val,
3153                        struct ptlrpc_request_set *set)
3154 {
3155         struct ptlrpc_request *req;
3156         struct obd_device     *obd = exp->exp_obd;
3157         struct obd_import     *imp = class_exp2cliimp(exp);
3158         char                  *tmp;
3159         int                    rc;
3160         ENTRY;
3161
3162         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3163
3164         if (KEY_IS(KEY_CHECKSUM)) {
3165                 if (vallen != sizeof(int))
3166                         RETURN(-EINVAL);
3167                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3168                 RETURN(0);
3169         }
3170
3171         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3172                 sptlrpc_conf_client_adapt(obd);
3173                 RETURN(0);
3174         }
3175
3176         if (KEY_IS(KEY_FLUSH_CTX)) {
3177                 sptlrpc_import_flush_my_ctx(imp);
3178                 RETURN(0);
3179         }
3180
3181         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3182                 struct client_obd *cli = &obd->u.cli;
3183                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3184                 long target = *(long *)val;
3185
3186                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3187                 *(long *)val -= nr;
3188                 RETURN(0);
3189         }
3190
3191         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3192                 RETURN(-EINVAL);
3193
3194         /* We pass all other commands directly to OST. Since nobody calls osc
3195            methods directly and everybody is supposed to go through LOV, we
3196            assume lov checked invalid values for us.
3197            The only recognised values so far are evict_by_nid and mds_conn.
3198            Even if something bad goes through, we'd get a -EINVAL from OST
3199            anyway. */
3200
3201         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3202                                                 &RQF_OST_SET_GRANT_INFO :
3203                                                 &RQF_OBD_SET_INFO);
3204         if (req == NULL)
3205                 RETURN(-ENOMEM);
3206
3207         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3208                              RCL_CLIENT, keylen);
3209         if (!KEY_IS(KEY_GRANT_SHRINK))
3210                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3211                                      RCL_CLIENT, vallen);
3212         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3213         if (rc) {
3214                 ptlrpc_request_free(req);
3215                 RETURN(rc);
3216         }
3217
3218         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3219         memcpy(tmp, key, keylen);
3220         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3221                                                         &RMF_OST_BODY :
3222                                                         &RMF_SETINFO_VAL);
3223         memcpy(tmp, val, vallen);
3224
3225         if (KEY_IS(KEY_GRANT_SHRINK)) {
3226                 struct osc_grant_args *aa;
3227                 struct obdo *oa;
3228
3229                 aa = ptlrpc_req_async_args(aa, req);
3230                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3231                 if (!oa) {
3232                         ptlrpc_req_finished(req);
3233                         RETURN(-ENOMEM);
3234                 }
3235                 *oa = ((struct ost_body *)val)->oa;
3236                 aa->aa_oa = oa;
3237                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3238         }
3239
3240         ptlrpc_request_set_replen(req);
3241         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3242                 LASSERT(set != NULL);
3243                 ptlrpc_set_add_req(set, req);
3244                 ptlrpc_check_set(NULL, set);
3245         } else {
3246                 ptlrpcd_add_req(req);
3247         }
3248
3249         RETURN(0);
3250 }
3251 EXPORT_SYMBOL(osc_set_info_async);
3252
3253 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3254                   struct obd_device *obd, struct obd_uuid *cluuid,
3255                   struct obd_connect_data *data, void *localdata)
3256 {
3257         struct client_obd *cli = &obd->u.cli;
3258
3259         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3260                 long lost_grant;
3261                 long grant;
3262
3263                 spin_lock(&cli->cl_loi_list_lock);
3264                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3265                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3266                         /* restore ocd_grant_blkbits as client page bits */
3267                         data->ocd_grant_blkbits = PAGE_SHIFT;
3268                         grant += cli->cl_dirty_grant;
3269                 } else {
3270                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3271                 }
3272                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3273                 lost_grant = cli->cl_lost_grant;
3274                 cli->cl_lost_grant = 0;
3275                 spin_unlock(&cli->cl_loi_list_lock);
3276
3277                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3278                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3279                        data->ocd_version, data->ocd_grant, lost_grant);
3280         }
3281
3282         RETURN(0);
3283 }
3284 EXPORT_SYMBOL(osc_reconnect);
3285
3286 int osc_disconnect(struct obd_export *exp)
3287 {
3288         struct obd_device *obd = class_exp2obd(exp);
3289         int rc;
3290
3291         rc = client_disconnect_export(exp);
3292         /**
3293          * Initially we put del_shrink_grant before disconnect_export, but it
3294          * causes the following problem if setup (connect) and cleanup
3295          * (disconnect) are tangled together.
3296          *      connect p1                     disconnect p2
3297          *   ptlrpc_connect_import
3298          *     ...............               class_manual_cleanup
3299          *                                     osc_disconnect
3300          *                                     del_shrink_grant
3301          *   ptlrpc_connect_interrupt
3302          *     osc_init_grant
3303          *   add this client to shrink list
3304          *                                      cleanup_osc
3305          * Bang! grant shrink thread trigger the shrink. BUG18662
3306          */
3307         osc_del_grant_list(&obd->u.cli);
3308         return rc;
3309 }
3310 EXPORT_SYMBOL(osc_disconnect);
3311
3312 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3313                                  struct hlist_node *hnode, void *arg)
3314 {
3315         struct lu_env *env = arg;
3316         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3317         struct ldlm_lock *lock;
3318         struct osc_object *osc = NULL;
3319         ENTRY;
3320
3321         lock_res(res);
3322         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3323                 if (lock->l_ast_data != NULL && osc == NULL) {
3324                         osc = lock->l_ast_data;
3325                         cl_object_get(osc2cl(osc));
3326                 }
3327
3328                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3329                  * by the 2nd round of ldlm_namespace_clean() call in
3330                  * osc_import_event(). */
3331                 ldlm_clear_cleaned(lock);
3332         }
3333         unlock_res(res);
3334
3335         if (osc != NULL) {
3336                 osc_object_invalidate(env, osc);
3337                 cl_object_put(env, osc2cl(osc));
3338         }
3339
3340         RETURN(0);
3341 }
3342 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3343
3344 static int osc_import_event(struct obd_device *obd,
3345                             struct obd_import *imp,
3346                             enum obd_import_event event)
3347 {
3348         struct client_obd *cli;
3349         int rc = 0;
3350
3351         ENTRY;
3352         LASSERT(imp->imp_obd == obd);
3353
3354         switch (event) {
3355         case IMP_EVENT_DISCON: {
3356                 cli = &obd->u.cli;
3357                 spin_lock(&cli->cl_loi_list_lock);
3358                 cli->cl_avail_grant = 0;
3359                 cli->cl_lost_grant = 0;
3360                 spin_unlock(&cli->cl_loi_list_lock);
3361                 break;
3362         }
3363         case IMP_EVENT_INACTIVE: {
3364                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3365                 break;
3366         }
3367         case IMP_EVENT_INVALIDATE: {
3368                 struct ldlm_namespace *ns = obd->obd_namespace;
3369                 struct lu_env         *env;
3370                 __u16                  refcheck;
3371
3372                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3373
3374                 env = cl_env_get(&refcheck);
3375                 if (!IS_ERR(env)) {
3376                         osc_io_unplug(env, &obd->u.cli, NULL);
3377
3378                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3379                                                  osc_ldlm_resource_invalidate,
3380                                                  env, 0);
3381                         cl_env_put(env, &refcheck);
3382
3383                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3384                 } else
3385                         rc = PTR_ERR(env);
3386                 break;
3387         }
3388         case IMP_EVENT_ACTIVE: {
3389                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3390                 break;
3391         }
3392         case IMP_EVENT_OCD: {
3393                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3394
3395                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3396                         osc_init_grant(&obd->u.cli, ocd);
3397
3398                 /* See bug 7198 */
3399                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3400                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3401
3402                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3403                 break;
3404         }
3405         case IMP_EVENT_DEACTIVATE: {
3406                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3407                 break;
3408         }
3409         case IMP_EVENT_ACTIVATE: {
3410                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3411                 break;
3412         }
3413         default:
3414                 CERROR("Unknown import event %d\n", event);
3415                 LBUG();
3416         }
3417         RETURN(rc);
3418 }
3419
3420 /**
3421  * Determine whether the lock can be canceled before replaying the lock
3422  * during recovery, see bug16774 for detailed information.
3423  *
3424  * \retval zero the lock can't be canceled
3425  * \retval other ok to cancel
3426  */
3427 static int osc_cancel_weight(struct ldlm_lock *lock)
3428 {
3429         /*
3430          * Cancel all unused and granted extent lock.
3431          */
3432         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3433             ldlm_is_granted(lock) &&
3434             osc_ldlm_weigh_ast(lock) == 0)
3435                 RETURN(1);
3436
3437         RETURN(0);
3438 }
3439
3440 static int brw_queue_work(const struct lu_env *env, void *data)
3441 {
3442         struct client_obd *cli = data;
3443
3444         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3445
3446         osc_io_unplug(env, cli, NULL);
3447         RETURN(0);
3448 }
3449
3450 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3451 {
3452         struct client_obd *cli = &obd->u.cli;
3453         void *handler;
3454         int rc;
3455
3456         ENTRY;
3457
3458         rc = ptlrpcd_addref();
3459         if (rc)
3460                 RETURN(rc);
3461
3462         rc = client_obd_setup(obd, lcfg);
3463         if (rc)
3464                 GOTO(out_ptlrpcd, rc);
3465
3466
3467         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3468         if (IS_ERR(handler))
3469                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3470         cli->cl_writeback_work = handler;
3471
3472         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3473         if (IS_ERR(handler))
3474                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3475         cli->cl_lru_work = handler;
3476
3477         rc = osc_quota_setup(obd);
3478         if (rc)
3479                 GOTO(out_ptlrpcd_work, rc);
3480
3481         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3482         osc_update_next_shrink(cli);
3483
3484         RETURN(rc);
3485
3486 out_ptlrpcd_work:
3487         if (cli->cl_writeback_work != NULL) {
3488                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3489                 cli->cl_writeback_work = NULL;
3490         }
3491         if (cli->cl_lru_work != NULL) {
3492                 ptlrpcd_destroy_work(cli->cl_lru_work);
3493                 cli->cl_lru_work = NULL;
3494         }
3495         client_obd_cleanup(obd);
3496 out_ptlrpcd:
3497         ptlrpcd_decref();
3498         RETURN(rc);
3499 }
3500 EXPORT_SYMBOL(osc_setup_common);
3501
3502 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3503 {
3504         struct client_obd *cli = &obd->u.cli;
3505         int                adding;
3506         int                added;
3507         int                req_count;
3508         int                rc;
3509
3510         ENTRY;
3511
3512         rc = osc_setup_common(obd, lcfg);
3513         if (rc < 0)
3514                 RETURN(rc);
3515
3516         rc = osc_tunables_init(obd);
3517         if (rc)
3518                 RETURN(rc);
3519
3520         /*
3521          * We try to control the total number of requests with a upper limit
3522          * osc_reqpool_maxreqcount. There might be some race which will cause
3523          * over-limit allocation, but it is fine.
3524          */
3525         req_count = atomic_read(&osc_pool_req_count);
3526         if (req_count < osc_reqpool_maxreqcount) {
3527                 adding = cli->cl_max_rpcs_in_flight + 2;
3528                 if (req_count + adding > osc_reqpool_maxreqcount)
3529                         adding = osc_reqpool_maxreqcount - req_count;
3530
3531                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3532                 atomic_add(added, &osc_pool_req_count);
3533         }
3534
3535         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3536
3537         spin_lock(&osc_shrink_lock);
3538         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3539         spin_unlock(&osc_shrink_lock);
3540         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3541         cli->cl_import->imp_idle_debug = D_HA;
3542
3543         RETURN(0);
3544 }
3545
3546 int osc_precleanup_common(struct obd_device *obd)
3547 {
3548         struct client_obd *cli = &obd->u.cli;
3549         ENTRY;
3550
3551         /* LU-464
3552          * for echo client, export may be on zombie list, wait for
3553          * zombie thread to cull it, because cli.cl_import will be
3554          * cleared in client_disconnect_export():
3555          *   class_export_destroy() -> obd_cleanup() ->
3556          *   echo_device_free() -> echo_client_cleanup() ->
3557          *   obd_disconnect() -> osc_disconnect() ->
3558          *   client_disconnect_export()
3559          */
3560         obd_zombie_barrier();
3561         if (cli->cl_writeback_work) {
3562                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3563                 cli->cl_writeback_work = NULL;
3564         }
3565
3566         if (cli->cl_lru_work) {
3567                 ptlrpcd_destroy_work(cli->cl_lru_work);
3568                 cli->cl_lru_work = NULL;
3569         }
3570
3571         obd_cleanup_client_import(obd);
3572         RETURN(0);
3573 }
3574 EXPORT_SYMBOL(osc_precleanup_common);
3575
3576 static int osc_precleanup(struct obd_device *obd)
3577 {
3578         ENTRY;
3579
3580         osc_precleanup_common(obd);
3581
3582         ptlrpc_lprocfs_unregister_obd(obd);
3583         RETURN(0);
3584 }
3585
3586 int osc_cleanup_common(struct obd_device *obd)
3587 {
3588         struct client_obd *cli = &obd->u.cli;
3589         int rc;
3590
3591         ENTRY;
3592
3593         spin_lock(&osc_shrink_lock);
3594         list_del(&cli->cl_shrink_list);
3595         spin_unlock(&osc_shrink_lock);
3596
3597         /* lru cleanup */
3598         if (cli->cl_cache != NULL) {
3599                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3600                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3601                 list_del_init(&cli->cl_lru_osc);
3602                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3603                 cli->cl_lru_left = NULL;
3604                 cl_cache_decref(cli->cl_cache);
3605                 cli->cl_cache = NULL;
3606         }
3607
3608         /* free memory of osc quota cache */
3609         osc_quota_cleanup(obd);
3610
3611         rc = client_obd_cleanup(obd);
3612
3613         ptlrpcd_decref();
3614         RETURN(rc);
3615 }
3616 EXPORT_SYMBOL(osc_cleanup_common);
3617
3618 static const struct obd_ops osc_obd_ops = {
3619         .o_owner                = THIS_MODULE,
3620         .o_setup                = osc_setup,
3621         .o_precleanup           = osc_precleanup,
3622         .o_cleanup              = osc_cleanup_common,
3623         .o_add_conn             = client_import_add_conn,
3624         .o_del_conn             = client_import_del_conn,
3625         .o_connect              = client_connect_import,
3626         .o_reconnect            = osc_reconnect,
3627         .o_disconnect           = osc_disconnect,
3628         .o_statfs               = osc_statfs,
3629         .o_statfs_async         = osc_statfs_async,
3630         .o_create               = osc_create,
3631         .o_destroy              = osc_destroy,
3632         .o_getattr              = osc_getattr,
3633         .o_setattr              = osc_setattr,
3634         .o_iocontrol            = osc_iocontrol,
3635         .o_set_info_async       = osc_set_info_async,
3636         .o_import_event         = osc_import_event,
3637         .o_quotactl             = osc_quotactl,
3638 };
3639
3640 LIST_HEAD(osc_shrink_list);
3641 DEFINE_SPINLOCK(osc_shrink_lock);
3642
3643 #ifdef HAVE_SHRINKER_COUNT
3644 static struct shrinker osc_cache_shrinker = {
3645         .count_objects  = osc_cache_shrink_count,
3646         .scan_objects   = osc_cache_shrink_scan,
3647         .seeks          = DEFAULT_SEEKS,
3648 };
3649 #else
3650 static int osc_cache_shrink(struct shrinker *shrinker,
3651                             struct shrink_control *sc)
3652 {
3653         (void)osc_cache_shrink_scan(shrinker, sc);
3654
3655         return osc_cache_shrink_count(shrinker, sc);
3656 }
3657
3658 static struct shrinker osc_cache_shrinker = {
3659         .shrink   = osc_cache_shrink,
3660         .seeks    = DEFAULT_SEEKS,
3661 };
3662 #endif
3663
3664 static int __init osc_init(void)
3665 {
3666         unsigned int reqpool_size;
3667         unsigned int reqsize;
3668         int rc;
3669         ENTRY;
3670
3671         /* print an address of _any_ initialized kernel symbol from this
3672          * module, to allow debugging with gdb that doesn't support data
3673          * symbols from modules.*/
3674         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3675
3676         rc = lu_kmem_init(osc_caches);
3677         if (rc)
3678                 RETURN(rc);
3679
3680         rc = class_register_type(&osc_obd_ops, NULL, true,
3681                                  LUSTRE_OSC_NAME, &osc_device_type);
3682         if (rc)
3683                 GOTO(out_kmem, rc);
3684
3685         rc = register_shrinker(&osc_cache_shrinker);
3686         if (rc)
3687                 GOTO(out_type, rc);
3688
3689         /* This is obviously too much memory, only prevent overflow here */
3690         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3691                 GOTO(out_shrinker, rc = -EINVAL);
3692
3693         reqpool_size = osc_reqpool_mem_max << 20;
3694
3695         reqsize = 1;
3696         while (reqsize < OST_IO_MAXREQSIZE)
3697                 reqsize = reqsize << 1;
3698
3699         /*
3700          * We don't enlarge the request count in OSC pool according to
3701          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3702          * tried after normal allocation failed. So a small OSC pool won't
3703          * cause much performance degression in most of cases.
3704          */
3705         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3706
3707         atomic_set(&osc_pool_req_count, 0);
3708         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3709                                           ptlrpc_add_rqs_to_pool);
3710
3711         if (osc_rq_pool == NULL)
3712                 GOTO(out_shrinker, rc = -ENOMEM);
3713
3714         rc = osc_start_grant_work();
3715         if (rc != 0)
3716                 GOTO(out_req_pool, rc);
3717
3718         RETURN(rc);
3719
3720 out_req_pool:
3721         ptlrpc_free_rq_pool(osc_rq_pool);
3722 out_shrinker:
3723         unregister_shrinker(&osc_cache_shrinker);
3724 out_type:
3725         class_unregister_type(LUSTRE_OSC_NAME);
3726 out_kmem:
3727         lu_kmem_fini(osc_caches);
3728
3729         RETURN(rc);
3730 }
3731
3732 static void __exit osc_exit(void)
3733 {
3734         osc_stop_grant_work();
3735         unregister_shrinker(&osc_cache_shrinker);
3736         class_unregister_type(LUSTRE_OSC_NAME);
3737         lu_kmem_fini(osc_caches);
3738         ptlrpc_free_rq_pool(osc_rq_pool);
3739 }
3740
3741 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3742 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3743 MODULE_VERSION(LUSTRE_VERSION_STRING);
3744 MODULE_LICENSE("GPL");
3745
3746 module_init(osc_init);
3747 module_exit(osc_exit);