Whamcloud - gitweb
LU-14283 osc: avoid crash if ocd reset
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (cli->cl_ocd_grant_param)
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (cli->cl_ocd_grant_param) {
734                         int nrextents;
735
736                         /* take extent tax into account when asking for more
737                          * grant space */
738                         nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
739                                      cli->cl_max_extent_pages;
740                         undirty += nrextents * cli->cl_grant_extent_tax;
741                 }
742                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
743                  * to add extent tax, etc.
744                  */
745                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
746                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
747         }
748         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
749         /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
750         if (cli->cl_lost_grant > INT_MAX) {
751                 CDEBUG(D_CACHE,
752                       "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
753                       cli_name(cli), cli->cl_lost_grant);
754                 oa->o_dropped = INT_MAX;
755         } else {
756                 oa->o_dropped = cli->cl_lost_grant;
757         }
758         cli->cl_lost_grant -= oa->o_dropped;
759         spin_unlock(&cli->cl_loi_list_lock);
760         CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
761                " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
762                oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
763 }
764
765 void osc_update_next_shrink(struct client_obd *cli)
766 {
767         cli->cl_next_shrink_grant = ktime_get_seconds() +
768                                     cli->cl_grant_shrink_interval;
769
770         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
771                cli->cl_next_shrink_grant);
772 }
773
774 static void __osc_update_grant(struct client_obd *cli, u64 grant)
775 {
776         spin_lock(&cli->cl_loi_list_lock);
777         cli->cl_avail_grant += grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779 }
780
781 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
782 {
783         if (body->oa.o_valid & OBD_MD_FLGRANT) {
784                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
785                 __osc_update_grant(cli, body->oa.o_grant);
786         }
787 }
788
789 /**
790  * grant thread data for shrinking space.
791  */
792 struct grant_thread_data {
793         struct list_head        gtd_clients;
794         struct mutex            gtd_mutex;
795         unsigned long           gtd_stopped:1;
796 };
797 static struct grant_thread_data client_gtd;
798
799 static int osc_shrink_grant_interpret(const struct lu_env *env,
800                                       struct ptlrpc_request *req,
801                                       void *args, int rc)
802 {
803         struct osc_grant_args *aa = args;
804         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
805         struct ost_body *body;
806
807         if (rc != 0) {
808                 __osc_update_grant(cli, aa->aa_oa->o_grant);
809                 GOTO(out, rc);
810         }
811
812         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
813         LASSERT(body);
814         osc_update_grant(cli, body);
815 out:
816         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
817         aa->aa_oa = NULL;
818
819         return rc;
820 }
821
822 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
823 {
824         spin_lock(&cli->cl_loi_list_lock);
825         oa->o_grant = cli->cl_avail_grant / 4;
826         cli->cl_avail_grant -= oa->o_grant;
827         spin_unlock(&cli->cl_loi_list_lock);
828         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
829                 oa->o_valid |= OBD_MD_FLFLAGS;
830                 oa->o_flags = 0;
831         }
832         oa->o_flags |= OBD_FL_SHRINK_GRANT;
833         osc_update_next_shrink(cli);
834 }
835
836 /* Shrink the current grant, either from some large amount to enough for a
837  * full set of in-flight RPCs, or if we have already shrunk to that limit
838  * then to enough for a single RPC.  This avoids keeping more grant than
839  * needed, and avoids shrinking the grant piecemeal. */
840 static int osc_shrink_grant(struct client_obd *cli)
841 {
842         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
843                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
844
845         spin_lock(&cli->cl_loi_list_lock);
846         if (cli->cl_avail_grant <= target_bytes)
847                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
848         spin_unlock(&cli->cl_loi_list_lock);
849
850         return osc_shrink_grant_to_target(cli, target_bytes);
851 }
852
853 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
854 {
855         int                     rc = 0;
856         struct ost_body        *body;
857         ENTRY;
858
859         spin_lock(&cli->cl_loi_list_lock);
860         /* Don't shrink if we are already above or below the desired limit
861          * We don't want to shrink below a single RPC, as that will negatively
862          * impact block allocation and long-term performance. */
863         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
864                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
865
866         if (target_bytes >= cli->cl_avail_grant) {
867                 spin_unlock(&cli->cl_loi_list_lock);
868                 RETURN(0);
869         }
870         spin_unlock(&cli->cl_loi_list_lock);
871
872         OBD_ALLOC_PTR(body);
873         if (!body)
874                 RETURN(-ENOMEM);
875
876         osc_announce_cached(cli, &body->oa, 0);
877
878         spin_lock(&cli->cl_loi_list_lock);
879         if (target_bytes >= cli->cl_avail_grant) {
880                 /* available grant has changed since target calculation */
881                 spin_unlock(&cli->cl_loi_list_lock);
882                 GOTO(out_free, rc = 0);
883         }
884         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
885         cli->cl_avail_grant = target_bytes;
886         spin_unlock(&cli->cl_loi_list_lock);
887         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
888                 body->oa.o_valid |= OBD_MD_FLFLAGS;
889                 body->oa.o_flags = 0;
890         }
891         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
892         osc_update_next_shrink(cli);
893
894         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
895                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
896                                 sizeof(*body), body, NULL);
897         if (rc != 0)
898                 __osc_update_grant(cli, body->oa.o_grant);
899 out_free:
900         OBD_FREE_PTR(body);
901         RETURN(rc);
902 }
903
904 static int osc_should_shrink_grant(struct client_obd *client)
905 {
906         time64_t next_shrink = client->cl_next_shrink_grant;
907
908         if (client->cl_import == NULL)
909                 return 0;
910
911         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
912             client->cl_import->imp_grant_shrink_disabled) {
913                 osc_update_next_shrink(client);
914                 return 0;
915         }
916
917         if (ktime_get_seconds() >= next_shrink - 5) {
918                 /* Get the current RPC size directly, instead of going via:
919                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
920                  * Keep comment here so that it can be found by searching. */
921                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
922
923                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
924                     client->cl_avail_grant > brw_size)
925                         return 1;
926                 else
927                         osc_update_next_shrink(client);
928         }
929         return 0;
930 }
931
932 #define GRANT_SHRINK_RPC_BATCH  100
933
934 static struct delayed_work work;
935
936 static void osc_grant_work_handler(struct work_struct *data)
937 {
938         struct client_obd *cli;
939         int rpc_sent;
940         bool init_next_shrink = true;
941         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
942
943         rpc_sent = 0;
944         mutex_lock(&client_gtd.gtd_mutex);
945         list_for_each_entry(cli, &client_gtd.gtd_clients,
946                             cl_grant_chain) {
947                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
948                     osc_should_shrink_grant(cli)) {
949                         osc_shrink_grant(cli);
950                         rpc_sent++;
951                 }
952
953                 if (!init_next_shrink) {
954                         if (cli->cl_next_shrink_grant < next_shrink &&
955                             cli->cl_next_shrink_grant > ktime_get_seconds())
956                                 next_shrink = cli->cl_next_shrink_grant;
957                 } else {
958                         init_next_shrink = false;
959                         next_shrink = cli->cl_next_shrink_grant;
960                 }
961         }
962         mutex_unlock(&client_gtd.gtd_mutex);
963
964         if (client_gtd.gtd_stopped == 1)
965                 return;
966
967         if (next_shrink > ktime_get_seconds()) {
968                 time64_t delay = next_shrink - ktime_get_seconds();
969
970                 schedule_delayed_work(&work, cfs_time_seconds(delay));
971         } else {
972                 schedule_work(&work.work);
973         }
974 }
975
976 void osc_schedule_grant_work(void)
977 {
978         cancel_delayed_work_sync(&work);
979         schedule_work(&work.work);
980 }
981
982 /**
983  * Start grant thread for returing grant to server for idle clients.
984  */
985 static int osc_start_grant_work(void)
986 {
987         client_gtd.gtd_stopped = 0;
988         mutex_init(&client_gtd.gtd_mutex);
989         INIT_LIST_HEAD(&client_gtd.gtd_clients);
990
991         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
992         schedule_work(&work.work);
993
994         return 0;
995 }
996
997 static void osc_stop_grant_work(void)
998 {
999         client_gtd.gtd_stopped = 1;
1000         cancel_delayed_work_sync(&work);
1001 }
1002
1003 static void osc_add_grant_list(struct client_obd *client)
1004 {
1005         mutex_lock(&client_gtd.gtd_mutex);
1006         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1007         mutex_unlock(&client_gtd.gtd_mutex);
1008 }
1009
1010 static void osc_del_grant_list(struct client_obd *client)
1011 {
1012         if (list_empty(&client->cl_grant_chain))
1013                 return;
1014
1015         mutex_lock(&client_gtd.gtd_mutex);
1016         list_del_init(&client->cl_grant_chain);
1017         mutex_unlock(&client_gtd.gtd_mutex);
1018 }
1019
1020 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1021 {
1022         /*
1023          * ocd_grant is the total grant amount we're expect to hold: if we've
1024          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1025          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1026          * dirty.
1027          *
1028          * race is tolerable here: if we're evicted, but imp_state already
1029          * left EVICTED state, then cl_dirty_pages must be 0 already.
1030          */
1031         spin_lock(&cli->cl_loi_list_lock);
1032         cli->cl_avail_grant = ocd->ocd_grant;
1033         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1034                 unsigned long consumed = cli->cl_reserved_grant;
1035
1036                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1037                         consumed += cli->cl_dirty_grant;
1038                 else
1039                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1040                 if (cli->cl_avail_grant < consumed) {
1041                         CERROR("%s: granted %ld but already consumed %ld\n",
1042                                cli_name(cli), cli->cl_avail_grant, consumed);
1043                         cli->cl_avail_grant = 0;
1044                 } else {
1045                         cli->cl_avail_grant -= consumed;
1046                 }
1047         }
1048
1049         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1050                 u64 size;
1051                 int chunk_mask;
1052
1053                 /* overhead for each extent insertion */
1054                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1055                 /* determine the appropriate chunk size used by osc_extent. */
1056                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1057                                           ocd->ocd_grant_blkbits);
1058                 /* max_pages_per_rpc must be chunk aligned */
1059                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1060                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1061                                              ~chunk_mask) & chunk_mask;
1062                 /* determine maximum extent size, in #pages */
1063                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1064                 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1065                 cli->cl_ocd_grant_param = 1;
1066         } else {
1067                 cli->cl_ocd_grant_param = 0;
1068                 cli->cl_grant_extent_tax = 0;
1069                 cli->cl_chunkbits = PAGE_SHIFT;
1070                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1071         }
1072         spin_unlock(&cli->cl_loi_list_lock);
1073
1074         CDEBUG(D_CACHE,
1075                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1076                cli_name(cli),
1077                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1078                cli->cl_max_extent_pages);
1079
1080         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1081                 osc_add_grant_list(cli);
1082 }
1083 EXPORT_SYMBOL(osc_init_grant);
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, size_t page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            size_t page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0) {
1142                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1143                                i, remote_rcs[i], req);
1144                         return remote_rcs[i];
1145                 }
1146
1147                 if (remote_rcs[i] != 0) {
1148                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1149                                 i, remote_rcs[i], req);
1150                         return(-EPROTO);
1151                 }
1152         }
1153         if (req->rq_bulk != NULL &&
1154             req->rq_bulk->bd_nob_transferred != requested_nob) {
1155                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1156                        req->rq_bulk->bd_nob_transferred, requested_nob);
1157                 return(-EPROTO);
1158         }
1159
1160         return (0);
1161 }
1162
1163 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1164 {
1165         if (p1->flag != p2->flag) {
1166                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1167                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1168                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1169
1170                 /* warn if we try to combine flags that we don't know to be
1171                  * safe to combine */
1172                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1173                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1174                               "report this at https://jira.whamcloud.com/\n",
1175                               p1->flag, p2->flag);
1176                 }
1177                 return 0;
1178         }
1179
1180         return (p1->off + p1->count == p2->off);
1181 }
1182
1183 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1184 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1185                                    size_t pg_count, struct brw_page **pga,
1186                                    int opc, obd_dif_csum_fn *fn,
1187                                    int sector_size,
1188                                    u32 *check_sum)
1189 {
1190         struct ahash_request *req;
1191         /* Used Adler as the default checksum type on top of DIF tags */
1192         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1193         struct page *__page;
1194         unsigned char *buffer;
1195         __u16 *guard_start;
1196         unsigned int bufsize;
1197         int guard_number;
1198         int used_number = 0;
1199         int used;
1200         u32 cksum;
1201         int rc = 0;
1202         int i = 0;
1203
1204         LASSERT(pg_count > 0);
1205
1206         __page = alloc_page(GFP_KERNEL);
1207         if (__page == NULL)
1208                 return -ENOMEM;
1209
1210         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1211         if (IS_ERR(req)) {
1212                 rc = PTR_ERR(req);
1213                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1214                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1215                 GOTO(out, rc);
1216         }
1217
1218         buffer = kmap(__page);
1219         guard_start = (__u16 *)buffer;
1220         guard_number = PAGE_SIZE / sizeof(*guard_start);
1221         while (nob > 0 && pg_count > 0) {
1222                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1223
1224                 /* corrupt the data before we compute the checksum, to
1225                  * simulate an OST->client data error */
1226                 if (unlikely(i == 0 && opc == OST_READ &&
1227                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1228                         unsigned char *ptr = kmap(pga[i]->pg);
1229                         int off = pga[i]->off & ~PAGE_MASK;
1230
1231                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1232                         kunmap(pga[i]->pg);
1233                 }
1234
1235                 /*
1236                  * The left guard number should be able to hold checksums of a
1237                  * whole page
1238                  */
1239                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1240                                                   pga[i]->off & ~PAGE_MASK,
1241                                                   count,
1242                                                   guard_start + used_number,
1243                                                   guard_number - used_number,
1244                                                   &used, sector_size,
1245                                                   fn);
1246                 if (rc)
1247                         break;
1248
1249                 used_number += used;
1250                 if (used_number == guard_number) {
1251                         cfs_crypto_hash_update_page(req, __page, 0,
1252                                 used_number * sizeof(*guard_start));
1253                         used_number = 0;
1254                 }
1255
1256                 nob -= pga[i]->count;
1257                 pg_count--;
1258                 i++;
1259         }
1260         kunmap(__page);
1261         if (rc)
1262                 GOTO(out, rc);
1263
1264         if (used_number != 0)
1265                 cfs_crypto_hash_update_page(req, __page, 0,
1266                         used_number * sizeof(*guard_start));
1267
1268         bufsize = sizeof(cksum);
1269         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1270
1271         /* For sending we only compute the wrong checksum instead
1272          * of corrupting the data so it is still correct on a redo */
1273         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1274                 cksum++;
1275
1276         *check_sum = cksum;
1277 out:
1278         __free_page(__page);
1279         return rc;
1280 }
1281 #else /* !CONFIG_CRC_T10DIF */
1282 #define obd_dif_ip_fn NULL
1283 #define obd_dif_crc_fn NULL
1284 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1285         -EOPNOTSUPP
1286 #endif /* CONFIG_CRC_T10DIF */
1287
1288 static int osc_checksum_bulk(int nob, size_t pg_count,
1289                              struct brw_page **pga, int opc,
1290                              enum cksum_types cksum_type,
1291                              u32 *cksum)
1292 {
1293         int                             i = 0;
1294         struct ahash_request           *req;
1295         unsigned int                    bufsize;
1296         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1297
1298         LASSERT(pg_count > 0);
1299
1300         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1301         if (IS_ERR(req)) {
1302                 CERROR("Unable to initialize checksum hash %s\n",
1303                        cfs_crypto_hash_name(cfs_alg));
1304                 return PTR_ERR(req);
1305         }
1306
1307         while (nob > 0 && pg_count > 0) {
1308                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1309
1310                 /* corrupt the data before we compute the checksum, to
1311                  * simulate an OST->client data error */
1312                 if (i == 0 && opc == OST_READ &&
1313                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1314                         unsigned char *ptr = kmap(pga[i]->pg);
1315                         int off = pga[i]->off & ~PAGE_MASK;
1316
1317                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1318                         kunmap(pga[i]->pg);
1319                 }
1320                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1321                                             pga[i]->off & ~PAGE_MASK,
1322                                             count);
1323                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1324                                (int)(pga[i]->off & ~PAGE_MASK));
1325
1326                 nob -= pga[i]->count;
1327                 pg_count--;
1328                 i++;
1329         }
1330
1331         bufsize = sizeof(*cksum);
1332         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1333
1334         /* For sending we only compute the wrong checksum instead
1335          * of corrupting the data so it is still correct on a redo */
1336         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1337                 (*cksum)++;
1338
1339         return 0;
1340 }
1341
1342 static int osc_checksum_bulk_rw(const char *obd_name,
1343                                 enum cksum_types cksum_type,
1344                                 int nob, size_t pg_count,
1345                                 struct brw_page **pga, int opc,
1346                                 u32 *check_sum)
1347 {
1348         obd_dif_csum_fn *fn = NULL;
1349         int sector_size = 0;
1350         int rc;
1351
1352         ENTRY;
1353         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1354
1355         if (fn)
1356                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1357                                              opc, fn, sector_size, check_sum);
1358         else
1359                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1360                                        check_sum);
1361
1362         RETURN(rc);
1363 }
1364
1365 static inline void osc_release_bounce_pages(struct brw_page **pga,
1366                                             u32 page_count)
1367 {
1368 #ifdef HAVE_LUSTRE_CRYPTO
1369         int i;
1370
1371         for (i = 0; i < page_count; i++) {
1372                 /* Bounce pages allocated by a call to
1373                  * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1374                  * are identified thanks to the PageChecked flag.
1375                  */
1376                 if (PageChecked(pga[i]->pg))
1377                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1378                 pga[i]->count -= pga[i]->bp_count_diff;
1379                 pga[i]->off += pga[i]->bp_off_diff;
1380         }
1381 #endif
1382 }
1383
1384 static int
1385 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1386                      u32 page_count, struct brw_page **pga,
1387                      struct ptlrpc_request **reqp, int resend)
1388 {
1389         struct ptlrpc_request *req;
1390         struct ptlrpc_bulk_desc *desc;
1391         struct ost_body *body;
1392         struct obd_ioobj *ioobj;
1393         struct niobuf_remote *niobuf;
1394         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1395         struct osc_brw_async_args *aa;
1396         struct req_capsule *pill;
1397         struct brw_page *pg_prev;
1398         void *short_io_buf;
1399         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1400         struct inode *inode;
1401         bool directio = false;
1402
1403         ENTRY;
1404         inode = page2inode(pga[0]->pg);
1405         if (inode == NULL) {
1406                 /* Try to get reference to inode from cl_page if we are
1407                  * dealing with direct IO, as handled pages are not
1408                  * actual page cache pages.
1409                  */
1410                 struct osc_async_page *oap = brw_page2oap(pga[0]);
1411                 struct cl_page *clpage = oap2cl_page(oap);
1412
1413                 inode = clpage->cp_inode;
1414                 if (inode)
1415                         directio = true;
1416         }
1417         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1418                 RETURN(-ENOMEM); /* Recoverable */
1419         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1420                 RETURN(-EINVAL); /* Fatal */
1421
1422         if ((cmd & OBD_BRW_WRITE) != 0) {
1423                 opc = OST_WRITE;
1424                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1425                                                 osc_rq_pool,
1426                                                 &RQF_OST_BRW_WRITE);
1427         } else {
1428                 opc = OST_READ;
1429                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1430         }
1431         if (req == NULL)
1432                 RETURN(-ENOMEM);
1433
1434         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1435                 for (i = 0; i < page_count; i++) {
1436                         struct brw_page *pg = pga[i];
1437                         struct page *data_page = NULL;
1438                         bool retried = false;
1439                         bool lockedbymyself;
1440                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1441                         struct address_space *map_orig = NULL;
1442                         pgoff_t index_orig;
1443
1444 retry_encrypt:
1445                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1446                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1447                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1448                         /* The page can already be locked when we arrive here.
1449                          * This is possible when cl_page_assume/vvp_page_assume
1450                          * is stuck on wait_on_page_writeback with page lock
1451                          * held. In this case there is no risk for the lock to
1452                          * be released while we are doing our encryption
1453                          * processing, because writeback against that page will
1454                          * end in vvp_page_completion_write/cl_page_completion,
1455                          * which means only once the page is fully processed.
1456                          */
1457                         lockedbymyself = trylock_page(pg->pg);
1458                         if (directio) {
1459                                 map_orig = pg->pg->mapping;
1460                                 pg->pg->mapping = inode->i_mapping;
1461                                 index_orig = pg->pg->index;
1462                                 pg->pg->index = pg->off >> PAGE_SHIFT;
1463                         }
1464                         data_page =
1465                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1466                                                                  nunits, 0,
1467                                                                  GFP_NOFS);
1468                         if (directio) {
1469                                 pg->pg->mapping = map_orig;
1470                                 pg->pg->index = index_orig;
1471                         }
1472                         if (lockedbymyself)
1473                                 unlock_page(pg->pg);
1474                         if (IS_ERR(data_page)) {
1475                                 rc = PTR_ERR(data_page);
1476                                 if (rc == -ENOMEM && !retried) {
1477                                         retried = true;
1478                                         rc = 0;
1479                                         goto retry_encrypt;
1480                                 }
1481                                 ptlrpc_request_free(req);
1482                                 RETURN(rc);
1483                         }
1484                         /* Set PageChecked flag on bounce page for
1485                          * disambiguation in osc_release_bounce_pages().
1486                          */
1487                         SetPageChecked(data_page);
1488                         pg->pg = data_page;
1489                         /* there should be no gap in the middle of page array */
1490                         if (i == page_count - 1) {
1491                                 struct osc_async_page *oap = brw_page2oap(pg);
1492
1493                                 oa->o_size = oap->oap_count +
1494                                         oap->oap_obj_off + oap->oap_page_off;
1495                         }
1496                         /* len is forced to nunits, and relative offset to 0
1497                          * so store the old, clear text info
1498                          */
1499                         pg->bp_count_diff = nunits - pg->count;
1500                         pg->count = nunits;
1501                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1502                         pg->off = pg->off & PAGE_MASK;
1503                 }
1504         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1505                 for (i = 0; i < page_count; i++) {
1506                         struct brw_page *pg = pga[i];
1507                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1508
1509                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1510                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1511                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1512                         /* count/off are forced to cover the whole encryption
1513                          * unit size so that all encrypted data is stored on the
1514                          * OST, so adjust bp_{count,off}_diff for the size of
1515                          * the clear text.
1516                          */
1517                         pg->bp_count_diff = nunits - pg->count;
1518                         pg->count = nunits;
1519                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1520                         pg->off = pg->off & PAGE_MASK;
1521                 }
1522         }
1523
1524         for (niocount = i = 1; i < page_count; i++) {
1525                 if (!can_merge_pages(pga[i - 1], pga[i]))
1526                         niocount++;
1527         }
1528
1529         pill = &req->rq_pill;
1530         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1531                              sizeof(*ioobj));
1532         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1533                              niocount * sizeof(*niobuf));
1534
1535         for (i = 0; i < page_count; i++) {
1536                 short_io_size += pga[i]->count;
1537                 if (!inode || !IS_ENCRYPTED(inode)) {
1538                         pga[i]->bp_count_diff = 0;
1539                         pga[i]->bp_off_diff = 0;
1540                 }
1541         }
1542
1543         /* Check if read/write is small enough to be a short io. */
1544         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1545             !imp_connect_shortio(cli->cl_import))
1546                 short_io_size = 0;
1547
1548         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1549                              opc == OST_READ ? 0 : short_io_size);
1550         if (opc == OST_READ)
1551                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1552                                      short_io_size);
1553
1554         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1555         if (rc) {
1556                 ptlrpc_request_free(req);
1557                 RETURN(rc);
1558         }
1559         osc_set_io_portal(req);
1560
1561         ptlrpc_at_set_req_timeout(req);
1562         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1563          * retry logic */
1564         req->rq_no_retry_einprogress = 1;
1565
1566         if (short_io_size != 0) {
1567                 desc = NULL;
1568                 short_io_buf = NULL;
1569                 goto no_bulk;
1570         }
1571
1572         desc = ptlrpc_prep_bulk_imp(req, page_count,
1573                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1574                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1575                         PTLRPC_BULK_PUT_SINK),
1576                 OST_BULK_PORTAL,
1577                 &ptlrpc_bulk_kiov_pin_ops);
1578
1579         if (desc == NULL)
1580                 GOTO(out, rc = -ENOMEM);
1581         /* NB request now owns desc and will free it when it gets freed */
1582 no_bulk:
1583         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1584         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1585         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1586         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1587
1588         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1589
1590         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1591          * and from_kgid(), because they are asynchronous. Fortunately, variable
1592          * oa contains valid o_uid and o_gid in these two operations.
1593          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1594          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1595          * other process logic */
1596         body->oa.o_uid = oa->o_uid;
1597         body->oa.o_gid = oa->o_gid;
1598
1599         obdo_to_ioobj(oa, ioobj);
1600         ioobj->ioo_bufcnt = niocount;
1601         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1602          * that might be send for this request.  The actual number is decided
1603          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1604          * "max - 1" for old client compatibility sending "0", and also so the
1605          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1606         if (desc != NULL)
1607                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1608         else /* short io */
1609                 ioobj_max_brw_set(ioobj, 0);
1610
1611         if (short_io_size != 0) {
1612                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1613                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1614                         body->oa.o_flags = 0;
1615                 }
1616                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1617                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1618                        short_io_size);
1619                 if (opc == OST_WRITE) {
1620                         short_io_buf = req_capsule_client_get(pill,
1621                                                               &RMF_SHORT_IO);
1622                         LASSERT(short_io_buf != NULL);
1623                 }
1624         }
1625
1626         LASSERT(page_count > 0);
1627         pg_prev = pga[0];
1628         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1629                 struct brw_page *pg = pga[i];
1630                 int poff = pg->off & ~PAGE_MASK;
1631
1632                 LASSERT(pg->count > 0);
1633                 /* make sure there is no gap in the middle of page array */
1634                 LASSERTF(page_count == 1 ||
1635                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1636                           ergo(i > 0 && i < page_count - 1,
1637                                poff == 0 && pg->count == PAGE_SIZE)   &&
1638                           ergo(i == page_count - 1, poff == 0)),
1639                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1640                          i, page_count, pg, pg->off, pg->count);
1641                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1642                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1643                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1644                          i, page_count,
1645                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1646                          pg_prev->pg, page_private(pg_prev->pg),
1647                          pg_prev->pg->index, pg_prev->off);
1648                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1649                         (pg->flag & OBD_BRW_SRVLOCK));
1650                 if (short_io_size != 0 && opc == OST_WRITE) {
1651                         unsigned char *ptr = kmap_atomic(pg->pg);
1652
1653                         LASSERT(short_io_size >= requested_nob + pg->count);
1654                         memcpy(short_io_buf + requested_nob,
1655                                ptr + poff,
1656                                pg->count);
1657                         kunmap_atomic(ptr);
1658                 } else if (short_io_size == 0) {
1659                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1660                                                          pg->count);
1661                 }
1662                 requested_nob += pg->count;
1663
1664                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1665                         niobuf--;
1666                         niobuf->rnb_len += pg->count;
1667                 } else {
1668                         niobuf->rnb_offset = pg->off;
1669                         niobuf->rnb_len    = pg->count;
1670                         niobuf->rnb_flags  = pg->flag;
1671                 }
1672                 pg_prev = pg;
1673         }
1674
1675         LASSERTF((void *)(niobuf - niocount) ==
1676                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1677                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1678                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1679
1680         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1681         if (resend) {
1682                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1683                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1684                         body->oa.o_flags = 0;
1685                 }
1686                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1687         }
1688
1689         if (osc_should_shrink_grant(cli))
1690                 osc_shrink_grant_local(cli, &body->oa);
1691
1692         /* size[REQ_REC_OFF] still sizeof (*body) */
1693         if (opc == OST_WRITE) {
1694                 if (cli->cl_checksum &&
1695                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1696                         /* store cl_cksum_type in a local variable since
1697                          * it can be changed via lprocfs */
1698                         enum cksum_types cksum_type = cli->cl_cksum_type;
1699
1700                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1701                                 body->oa.o_flags = 0;
1702
1703                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1704                                                                 cksum_type);
1705                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1706
1707                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1708                                                   requested_nob, page_count,
1709                                                   pga, OST_WRITE,
1710                                                   &body->oa.o_cksum);
1711                         if (rc < 0) {
1712                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1713                                        rc);
1714                                 GOTO(out, rc);
1715                         }
1716                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1717                                body->oa.o_cksum);
1718
1719                         /* save this in 'oa', too, for later checking */
1720                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1721                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1722                                                            cksum_type);
1723                 } else {
1724                         /* clear out the checksum flag, in case this is a
1725                          * resend but cl_checksum is no longer set. b=11238 */
1726                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1727                 }
1728                 oa->o_cksum = body->oa.o_cksum;
1729                 /* 1 RC per niobuf */
1730                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1731                                      sizeof(__u32) * niocount);
1732         } else {
1733                 if (cli->cl_checksum &&
1734                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1735                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1736                                 body->oa.o_flags = 0;
1737                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1738                                 cli->cl_cksum_type);
1739                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1740                 }
1741
1742                 /* Client cksum has been already copied to wire obdo in previous
1743                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1744                  * resent due to cksum error, this will allow Server to
1745                  * check+dump pages on its side */
1746         }
1747         ptlrpc_request_set_replen(req);
1748
1749         aa = ptlrpc_req_async_args(aa, req);
1750         aa->aa_oa = oa;
1751         aa->aa_requested_nob = requested_nob;
1752         aa->aa_nio_count = niocount;
1753         aa->aa_page_count = page_count;
1754         aa->aa_resends = 0;
1755         aa->aa_ppga = pga;
1756         aa->aa_cli = cli;
1757         INIT_LIST_HEAD(&aa->aa_oaps);
1758
1759         *reqp = req;
1760         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1761         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1762                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1763                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1764         RETURN(0);
1765
1766  out:
1767         ptlrpc_req_finished(req);
1768         RETURN(rc);
1769 }
1770
1771 char dbgcksum_file_name[PATH_MAX];
1772
1773 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1774                                 struct brw_page **pga, __u32 server_cksum,
1775                                 __u32 client_cksum)
1776 {
1777         struct file *filp;
1778         int rc, i;
1779         unsigned int len;
1780         char *buf;
1781
1782         /* will only keep dump of pages on first error for the same range in
1783          * file/fid, not during the resends/retries. */
1784         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1785                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1786                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1787                   libcfs_debug_file_path_arr :
1788                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1789                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1790                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1791                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1792                  pga[0]->off,
1793                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1794                  client_cksum, server_cksum);
1795         filp = filp_open(dbgcksum_file_name,
1796                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1797         if (IS_ERR(filp)) {
1798                 rc = PTR_ERR(filp);
1799                 if (rc == -EEXIST)
1800                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1801                                "checksum error: rc = %d\n", dbgcksum_file_name,
1802                                rc);
1803                 else
1804                         CERROR("%s: can't open to dump pages with checksum "
1805                                "error: rc = %d\n", dbgcksum_file_name, rc);
1806                 return;
1807         }
1808
1809         for (i = 0; i < page_count; i++) {
1810                 len = pga[i]->count;
1811                 buf = kmap(pga[i]->pg);
1812                 while (len != 0) {
1813                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1814                         if (rc < 0) {
1815                                 CERROR("%s: wanted to write %u but got %d "
1816                                        "error\n", dbgcksum_file_name, len, rc);
1817                                 break;
1818                         }
1819                         len -= rc;
1820                         buf += rc;
1821                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1822                                dbgcksum_file_name, rc);
1823                 }
1824                 kunmap(pga[i]->pg);
1825         }
1826
1827         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1828         if (rc)
1829                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1830         filp_close(filp, NULL);
1831 }
1832
1833 static int
1834 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1835                      __u32 client_cksum, __u32 server_cksum,
1836                      struct osc_brw_async_args *aa)
1837 {
1838         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1839         enum cksum_types cksum_type;
1840         obd_dif_csum_fn *fn = NULL;
1841         int sector_size = 0;
1842         __u32 new_cksum;
1843         char *msg;
1844         int rc;
1845
1846         if (server_cksum == client_cksum) {
1847                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1848                 return 0;
1849         }
1850
1851         if (aa->aa_cli->cl_checksum_dump)
1852                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1853                                     server_cksum, client_cksum);
1854
1855         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1856                                            oa->o_flags : 0);
1857
1858         switch (cksum_type) {
1859         case OBD_CKSUM_T10IP512:
1860                 fn = obd_dif_ip_fn;
1861                 sector_size = 512;
1862                 break;
1863         case OBD_CKSUM_T10IP4K:
1864                 fn = obd_dif_ip_fn;
1865                 sector_size = 4096;
1866                 break;
1867         case OBD_CKSUM_T10CRC512:
1868                 fn = obd_dif_crc_fn;
1869                 sector_size = 512;
1870                 break;
1871         case OBD_CKSUM_T10CRC4K:
1872                 fn = obd_dif_crc_fn;
1873                 sector_size = 4096;
1874                 break;
1875         default:
1876                 break;
1877         }
1878
1879         if (fn)
1880                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1881                                              aa->aa_page_count, aa->aa_ppga,
1882                                              OST_WRITE, fn, sector_size,
1883                                              &new_cksum);
1884         else
1885                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1886                                        aa->aa_ppga, OST_WRITE, cksum_type,
1887                                        &new_cksum);
1888
1889         if (rc < 0)
1890                 msg = "failed to calculate the client write checksum";
1891         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1892                 msg = "the server did not use the checksum type specified in "
1893                       "the original request - likely a protocol problem";
1894         else if (new_cksum == server_cksum)
1895                 msg = "changed on the client after we checksummed it - "
1896                       "likely false positive due to mmap IO (bug 11742)";
1897         else if (new_cksum == client_cksum)
1898                 msg = "changed in transit before arrival at OST";
1899         else
1900                 msg = "changed in transit AND doesn't match the original - "
1901                       "likely false positive due to mmap IO (bug 11742)";
1902
1903         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1904                            DFID " object "DOSTID" extent [%llu-%llu], original "
1905                            "client csum %x (type %x), server csum %x (type %x),"
1906                            " client csum now %x\n",
1907                            obd_name, msg, libcfs_nid2str(peer->nid),
1908                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1909                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1910                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1911                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1912                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1913                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1914                            client_cksum,
1915                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1916                            server_cksum, cksum_type, new_cksum);
1917         return 1;
1918 }
1919
1920 /* Note rc enters this function as number of bytes transferred */
1921 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1922 {
1923         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1924         struct client_obd *cli = aa->aa_cli;
1925         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1926         const struct lnet_process_id *peer =
1927                 &req->rq_import->imp_connection->c_peer;
1928         struct ost_body *body;
1929         u32 client_cksum = 0;
1930         struct inode *inode;
1931         unsigned int blockbits = 0, blocksize = 0;
1932
1933         ENTRY;
1934
1935         if (rc < 0 && rc != -EDQUOT) {
1936                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1937                 RETURN(rc);
1938         }
1939
1940         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1941         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1942         if (body == NULL) {
1943                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1944                 RETURN(-EPROTO);
1945         }
1946
1947         /* set/clear over quota flag for a uid/gid/projid */
1948         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1949             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1950                 unsigned qid[LL_MAXQUOTAS] = {
1951                                          body->oa.o_uid, body->oa.o_gid,
1952                                          body->oa.o_projid };
1953                 CDEBUG(D_QUOTA,
1954                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1955                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1956                        body->oa.o_valid, body->oa.o_flags);
1957                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1958                                        body->oa.o_flags);
1959         }
1960
1961         osc_update_grant(cli, body);
1962
1963         if (rc < 0)
1964                 RETURN(rc);
1965
1966         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1967                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1968
1969         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1970                 if (rc > 0) {
1971                         CERROR("%s: unexpected positive size %d\n",
1972                                obd_name, rc);
1973                         RETURN(-EPROTO);
1974                 }
1975
1976                 if (req->rq_bulk != NULL &&
1977                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1978                         RETURN(-EAGAIN);
1979
1980                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1981                     check_write_checksum(&body->oa, peer, client_cksum,
1982                                          body->oa.o_cksum, aa))
1983                         RETURN(-EAGAIN);
1984
1985                 rc = check_write_rcs(req, aa->aa_requested_nob,
1986                                      aa->aa_nio_count, aa->aa_page_count,
1987                                      aa->aa_ppga);
1988                 GOTO(out, rc);
1989         }
1990
1991         /* The rest of this function executes only for OST_READs */
1992
1993         if (req->rq_bulk == NULL) {
1994                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1995                                           RCL_SERVER);
1996                 LASSERT(rc == req->rq_status);
1997         } else {
1998                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1999                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2000         }
2001         if (rc < 0)
2002                 GOTO(out, rc = -EAGAIN);
2003
2004         if (rc > aa->aa_requested_nob) {
2005                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2006                        rc, aa->aa_requested_nob);
2007                 RETURN(-EPROTO);
2008         }
2009
2010         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2011                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2012                        rc, req->rq_bulk->bd_nob_transferred);
2013                 RETURN(-EPROTO);
2014         }
2015
2016         if (req->rq_bulk == NULL) {
2017                 /* short io */
2018                 int nob, pg_count, i = 0;
2019                 unsigned char *buf;
2020
2021                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2022                 pg_count = aa->aa_page_count;
2023                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2024                                                    rc);
2025                 nob = rc;
2026                 while (nob > 0 && pg_count > 0) {
2027                         unsigned char *ptr;
2028                         int count = aa->aa_ppga[i]->count > nob ?
2029                                     nob : aa->aa_ppga[i]->count;
2030
2031                         CDEBUG(D_CACHE, "page %p count %d\n",
2032                                aa->aa_ppga[i]->pg, count);
2033                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2034                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2035                                count);
2036                         kunmap_atomic((void *) ptr);
2037
2038                         buf += count;
2039                         nob -= count;
2040                         i++;
2041                         pg_count--;
2042                 }
2043         }
2044
2045         if (rc < aa->aa_requested_nob)
2046                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2047
2048         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2049                 static int cksum_counter;
2050                 u32        server_cksum = body->oa.o_cksum;
2051                 char      *via = "";
2052                 char      *router = "";
2053                 enum cksum_types cksum_type;
2054                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2055                         body->oa.o_flags : 0;
2056
2057                 cksum_type = obd_cksum_type_unpack(o_flags);
2058                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2059                                           aa->aa_page_count, aa->aa_ppga,
2060                                           OST_READ, &client_cksum);
2061                 if (rc < 0)
2062                         GOTO(out, rc);
2063
2064                 if (req->rq_bulk != NULL &&
2065                     peer->nid != req->rq_bulk->bd_sender) {
2066                         via = " via ";
2067                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2068                 }
2069
2070                 if (server_cksum != client_cksum) {
2071                         struct ost_body *clbody;
2072                         u32 page_count = aa->aa_page_count;
2073
2074                         clbody = req_capsule_client_get(&req->rq_pill,
2075                                                         &RMF_OST_BODY);
2076                         if (cli->cl_checksum_dump)
2077                                 dump_all_bulk_pages(&clbody->oa, page_count,
2078                                                     aa->aa_ppga, server_cksum,
2079                                                     client_cksum);
2080
2081                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2082                                            "%s%s%s inode "DFID" object "DOSTID
2083                                            " extent [%llu-%llu], client %x, "
2084                                            "server %x, cksum_type %x\n",
2085                                            obd_name,
2086                                            libcfs_nid2str(peer->nid),
2087                                            via, router,
2088                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2089                                                 clbody->oa.o_parent_seq : 0ULL,
2090                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2091                                                 clbody->oa.o_parent_oid : 0,
2092                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2093                                                 clbody->oa.o_parent_ver : 0,
2094                                            POSTID(&body->oa.o_oi),
2095                                            aa->aa_ppga[0]->off,
2096                                            aa->aa_ppga[page_count-1]->off +
2097                                            aa->aa_ppga[page_count-1]->count - 1,
2098                                            client_cksum, server_cksum,
2099                                            cksum_type);
2100                         cksum_counter = 0;
2101                         aa->aa_oa->o_cksum = client_cksum;
2102                         rc = -EAGAIN;
2103                 } else {
2104                         cksum_counter++;
2105                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2106                         rc = 0;
2107                 }
2108         } else if (unlikely(client_cksum)) {
2109                 static int cksum_missed;
2110
2111                 cksum_missed++;
2112                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2113                         CERROR("%s: checksum %u requested from %s but not sent\n",
2114                                obd_name, cksum_missed,
2115                                libcfs_nid2str(peer->nid));
2116         } else {
2117                 rc = 0;
2118         }
2119
2120         inode = page2inode(aa->aa_ppga[0]->pg);
2121         if (inode == NULL) {
2122                 /* Try to get reference to inode from cl_page if we are
2123                  * dealing with direct IO, as handled pages are not
2124                  * actual page cache pages.
2125                  */
2126                 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2127
2128                 inode = oap2cl_page(oap)->cp_inode;
2129                 if (inode) {
2130                         blockbits = inode->i_blkbits;
2131                         blocksize = 1 << blockbits;
2132                 }
2133         }
2134         if (inode && IS_ENCRYPTED(inode)) {
2135                 int idx;
2136
2137                 if (!llcrypt_has_encryption_key(inode)) {
2138                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2139                         GOTO(out, rc);
2140                 }
2141                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2142                         struct brw_page *pg = aa->aa_ppga[idx];
2143                         unsigned int offs = 0;
2144
2145                         while (offs < PAGE_SIZE) {
2146                                 /* do not decrypt if page is all 0s */
2147                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2148                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2149                                         /* if page is empty forward info to
2150                                          * upper layers (ll_io_zero_page) by
2151                                          * clearing PagePrivate2
2152                                          */
2153                                         if (!offs)
2154                                                 ClearPagePrivate2(pg->pg);
2155                                         break;
2156                                 }
2157
2158                                 if (blockbits) {
2159                                         /* This is direct IO case. Directly call
2160                                          * decrypt function that takes inode as
2161                                          * input parameter. Page does not need
2162                                          * to be locked.
2163                                          */
2164                                         u64 lblk_num =
2165                                                 ((u64)(pg->off >> PAGE_SHIFT) <<
2166                                                      (PAGE_SHIFT - blockbits)) +
2167                                                        (offs >> blockbits);
2168                                         unsigned int i;
2169
2170                                         for (i = offs;
2171                                              i < offs +
2172                                                     LUSTRE_ENCRYPTION_UNIT_SIZE;
2173                                              i += blocksize, lblk_num++) {
2174                                                 rc =
2175                                                   llcrypt_decrypt_block_inplace(
2176                                                           inode, pg->pg,
2177                                                           blocksize, i,
2178                                                           lblk_num);
2179                                                 if (rc)
2180                                                         break;
2181                                         }
2182                                 } else {
2183                                         rc = llcrypt_decrypt_pagecache_blocks(
2184                                                 pg->pg,
2185                                                 LUSTRE_ENCRYPTION_UNIT_SIZE,
2186                                                 offs);
2187                                 }
2188                                 if (rc)
2189                                         GOTO(out, rc);
2190
2191                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2192                         }
2193                 }
2194         }
2195
2196 out:
2197         if (rc >= 0)
2198                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2199                                      aa->aa_oa, &body->oa);
2200
2201         RETURN(rc);
2202 }
2203
2204 static int osc_brw_redo_request(struct ptlrpc_request *request,
2205                                 struct osc_brw_async_args *aa, int rc)
2206 {
2207         struct ptlrpc_request *new_req;
2208         struct osc_brw_async_args *new_aa;
2209         struct osc_async_page *oap;
2210         ENTRY;
2211
2212         /* The below message is checked in replay-ost-single.sh test_8ae*/
2213         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2214                   "redo for recoverable error %d", rc);
2215
2216         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2217                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2218                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2219                                   aa->aa_ppga, &new_req, 1);
2220         if (rc)
2221                 RETURN(rc);
2222
2223         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2224                 if (oap->oap_request != NULL) {
2225                         LASSERTF(request == oap->oap_request,
2226                                  "request %p != oap_request %p\n",
2227                                  request, oap->oap_request);
2228                 }
2229         }
2230         /*
2231          * New request takes over pga and oaps from old request.
2232          * Note that copying a list_head doesn't work, need to move it...
2233          */
2234         aa->aa_resends++;
2235         new_req->rq_interpret_reply = request->rq_interpret_reply;
2236         new_req->rq_async_args = request->rq_async_args;
2237         new_req->rq_commit_cb = request->rq_commit_cb;
2238         /* cap resend delay to the current request timeout, this is similar to
2239          * what ptlrpc does (see after_reply()) */
2240         if (aa->aa_resends > new_req->rq_timeout)
2241                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2242         else
2243                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2244         new_req->rq_generation_set = 1;
2245         new_req->rq_import_generation = request->rq_import_generation;
2246
2247         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2248
2249         INIT_LIST_HEAD(&new_aa->aa_oaps);
2250         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2251         INIT_LIST_HEAD(&new_aa->aa_exts);
2252         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2253         new_aa->aa_resends = aa->aa_resends;
2254
2255         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2256                 if (oap->oap_request) {
2257                         ptlrpc_req_finished(oap->oap_request);
2258                         oap->oap_request = ptlrpc_request_addref(new_req);
2259                 }
2260         }
2261
2262         /* XXX: This code will run into problem if we're going to support
2263          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2264          * and wait for all of them to be finished. We should inherit request
2265          * set from old request. */
2266         ptlrpcd_add_req(new_req);
2267
2268         DEBUG_REQ(D_INFO, new_req, "new request");
2269         RETURN(0);
2270 }
2271
2272 /*
2273  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2274  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2275  * fine for our small page arrays and doesn't require allocation.  its an
2276  * insertion sort that swaps elements that are strides apart, shrinking the
2277  * stride down until its '1' and the array is sorted.
2278  */
2279 static void sort_brw_pages(struct brw_page **array, int num)
2280 {
2281         int stride, i, j;
2282         struct brw_page *tmp;
2283
2284         if (num == 1)
2285                 return;
2286         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2287                 ;
2288
2289         do {
2290                 stride /= 3;
2291                 for (i = stride ; i < num ; i++) {
2292                         tmp = array[i];
2293                         j = i;
2294                         while (j >= stride && array[j - stride]->off > tmp->off) {
2295                                 array[j] = array[j - stride];
2296                                 j -= stride;
2297                         }
2298                         array[j] = tmp;
2299                 }
2300         } while (stride > 1);
2301 }
2302
2303 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2304 {
2305         LASSERT(ppga != NULL);
2306         OBD_FREE_PTR_ARRAY(ppga, count);
2307 }
2308
2309 static int brw_interpret(const struct lu_env *env,
2310                          struct ptlrpc_request *req, void *args, int rc)
2311 {
2312         struct osc_brw_async_args *aa = args;
2313         struct osc_extent *ext;
2314         struct osc_extent *tmp;
2315         struct client_obd *cli = aa->aa_cli;
2316         unsigned long transferred = 0;
2317
2318         ENTRY;
2319
2320         rc = osc_brw_fini_request(req, rc);
2321         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2322
2323         /* restore clear text pages */
2324         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2325
2326         /*
2327          * When server returns -EINPROGRESS, client should always retry
2328          * regardless of the number of times the bulk was resent already.
2329          */
2330         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2331                 if (req->rq_import_generation !=
2332                     req->rq_import->imp_generation) {
2333                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2334                                ""DOSTID", rc = %d.\n",
2335                                req->rq_import->imp_obd->obd_name,
2336                                POSTID(&aa->aa_oa->o_oi), rc);
2337                 } else if (rc == -EINPROGRESS ||
2338                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2339                         rc = osc_brw_redo_request(req, aa, rc);
2340                 } else {
2341                         CERROR("%s: too many resent retries for object: "
2342                                "%llu:%llu, rc = %d.\n",
2343                                req->rq_import->imp_obd->obd_name,
2344                                POSTID(&aa->aa_oa->o_oi), rc);
2345                 }
2346
2347                 if (rc == 0)
2348                         RETURN(0);
2349                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2350                         rc = -EIO;
2351         }
2352
2353         if (rc == 0) {
2354                 struct obdo *oa = aa->aa_oa;
2355                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2356                 unsigned long valid = 0;
2357                 struct cl_object *obj;
2358                 struct osc_async_page *last;
2359
2360                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2361                 obj = osc2cl(last->oap_obj);
2362
2363                 cl_object_attr_lock(obj);
2364                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2365                         attr->cat_blocks = oa->o_blocks;
2366                         valid |= CAT_BLOCKS;
2367                 }
2368                 if (oa->o_valid & OBD_MD_FLMTIME) {
2369                         attr->cat_mtime = oa->o_mtime;
2370                         valid |= CAT_MTIME;
2371                 }
2372                 if (oa->o_valid & OBD_MD_FLATIME) {
2373                         attr->cat_atime = oa->o_atime;
2374                         valid |= CAT_ATIME;
2375                 }
2376                 if (oa->o_valid & OBD_MD_FLCTIME) {
2377                         attr->cat_ctime = oa->o_ctime;
2378                         valid |= CAT_CTIME;
2379                 }
2380
2381                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2382                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2383                         loff_t last_off = last->oap_count + last->oap_obj_off +
2384                                 last->oap_page_off;
2385
2386                         /* Change file size if this is an out of quota or
2387                          * direct IO write and it extends the file size */
2388                         if (loi->loi_lvb.lvb_size < last_off) {
2389                                 attr->cat_size = last_off;
2390                                 valid |= CAT_SIZE;
2391                         }
2392                         /* Extend KMS if it's not a lockless write */
2393                         if (loi->loi_kms < last_off &&
2394                             oap2osc_page(last)->ops_srvlock == 0) {
2395                                 attr->cat_kms = last_off;
2396                                 valid |= CAT_KMS;
2397                         }
2398                 }
2399
2400                 if (valid != 0)
2401                         cl_object_attr_update(env, obj, attr, valid);
2402                 cl_object_attr_unlock(obj);
2403         }
2404         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2405         aa->aa_oa = NULL;
2406
2407         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2408                 osc_inc_unstable_pages(req);
2409
2410         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2411                 list_del_init(&ext->oe_link);
2412                 osc_extent_finish(env, ext, 1,
2413                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2414         }
2415         LASSERT(list_empty(&aa->aa_exts));
2416         LASSERT(list_empty(&aa->aa_oaps));
2417
2418         transferred = (req->rq_bulk == NULL ? /* short io */
2419                        aa->aa_requested_nob :
2420                        req->rq_bulk->bd_nob_transferred);
2421
2422         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2423         ptlrpc_lprocfs_brw(req, transferred);
2424
2425         spin_lock(&cli->cl_loi_list_lock);
2426         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2427          * is called so we know whether to go to sync BRWs or wait for more
2428          * RPCs to complete */
2429         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2430                 cli->cl_w_in_flight--;
2431         else
2432                 cli->cl_r_in_flight--;
2433         osc_wake_cache_waiters(cli);
2434         spin_unlock(&cli->cl_loi_list_lock);
2435
2436         osc_io_unplug(env, cli, NULL);
2437         RETURN(rc);
2438 }
2439
2440 static void brw_commit(struct ptlrpc_request *req)
2441 {
2442         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2443          * this called via the rq_commit_cb, I need to ensure
2444          * osc_dec_unstable_pages is still called. Otherwise unstable
2445          * pages may be leaked. */
2446         spin_lock(&req->rq_lock);
2447         if (likely(req->rq_unstable)) {
2448                 req->rq_unstable = 0;
2449                 spin_unlock(&req->rq_lock);
2450
2451                 osc_dec_unstable_pages(req);
2452         } else {
2453                 req->rq_committed = 1;
2454                 spin_unlock(&req->rq_lock);
2455         }
2456 }
2457
2458 /**
2459  * Build an RPC by the list of extent @ext_list. The caller must ensure
2460  * that the total pages in this list are NOT over max pages per RPC.
2461  * Extents in the list must be in OES_RPC state.
2462  */
2463 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2464                   struct list_head *ext_list, int cmd)
2465 {
2466         struct ptlrpc_request           *req = NULL;
2467         struct osc_extent               *ext;
2468         struct brw_page                 **pga = NULL;
2469         struct osc_brw_async_args       *aa = NULL;
2470         struct obdo                     *oa = NULL;
2471         struct osc_async_page           *oap;
2472         struct osc_object               *obj = NULL;
2473         struct cl_req_attr              *crattr = NULL;
2474         loff_t                          starting_offset = OBD_OBJECT_EOF;
2475         loff_t                          ending_offset = 0;
2476         /* '1' for consistency with code that checks !mpflag to restore */
2477         int mpflag = 1;
2478         int                             mem_tight = 0;
2479         int                             page_count = 0;
2480         bool                            soft_sync = false;
2481         bool                            ndelay = false;
2482         int                             i;
2483         int                             grant = 0;
2484         int                             rc;
2485         __u32                           layout_version = 0;
2486         LIST_HEAD(rpc_list);
2487         struct ost_body                 *body;
2488         ENTRY;
2489         LASSERT(!list_empty(ext_list));
2490
2491         /* add pages into rpc_list to build BRW rpc */
2492         list_for_each_entry(ext, ext_list, oe_link) {
2493                 LASSERT(ext->oe_state == OES_RPC);
2494                 mem_tight |= ext->oe_memalloc;
2495                 grant += ext->oe_grants;
2496                 page_count += ext->oe_nr_pages;
2497                 layout_version = max(layout_version, ext->oe_layout_version);
2498                 if (obj == NULL)
2499                         obj = ext->oe_obj;
2500         }
2501
2502         soft_sync = osc_over_unstable_soft_limit(cli);
2503         if (mem_tight)
2504                 mpflag = memalloc_noreclaim_save();
2505
2506         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2507         if (pga == NULL)
2508                 GOTO(out, rc = -ENOMEM);
2509
2510         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2511         if (oa == NULL)
2512                 GOTO(out, rc = -ENOMEM);
2513
2514         i = 0;
2515         list_for_each_entry(ext, ext_list, oe_link) {
2516                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2517                         if (mem_tight)
2518                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2519                         if (soft_sync)
2520                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2521                         pga[i] = &oap->oap_brw_page;
2522                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2523                         i++;
2524
2525                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2526                         if (starting_offset == OBD_OBJECT_EOF ||
2527                             starting_offset > oap->oap_obj_off)
2528                                 starting_offset = oap->oap_obj_off;
2529                         else
2530                                 LASSERT(oap->oap_page_off == 0);
2531                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2532                                 ending_offset = oap->oap_obj_off +
2533                                                 oap->oap_count;
2534                         else
2535                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2536                                         PAGE_SIZE);
2537                 }
2538                 if (ext->oe_ndelay)
2539                         ndelay = true;
2540         }
2541
2542         /* first page in the list */
2543         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2544
2545         crattr = &osc_env_info(env)->oti_req_attr;
2546         memset(crattr, 0, sizeof(*crattr));
2547         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2548         crattr->cra_flags = ~0ULL;
2549         crattr->cra_page = oap2cl_page(oap);
2550         crattr->cra_oa = oa;
2551         cl_req_attr_set(env, osc2cl(obj), crattr);
2552
2553         if (cmd == OBD_BRW_WRITE) {
2554                 oa->o_grant_used = grant;
2555                 if (layout_version > 0) {
2556                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2557                                PFID(&oa->o_oi.oi_fid), layout_version);
2558
2559                         oa->o_layout_version = layout_version;
2560                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2561                 }
2562         }
2563
2564         sort_brw_pages(pga, page_count);
2565         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2566         if (rc != 0) {
2567                 CERROR("prep_req failed: %d\n", rc);
2568                 GOTO(out, rc);
2569         }
2570
2571         req->rq_commit_cb = brw_commit;
2572         req->rq_interpret_reply = brw_interpret;
2573         req->rq_memalloc = mem_tight != 0;
2574         oap->oap_request = ptlrpc_request_addref(req);
2575         if (ndelay) {
2576                 req->rq_no_resend = req->rq_no_delay = 1;
2577                 /* probably set a shorter timeout value.
2578                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2579                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2580         }
2581
2582         /* Need to update the timestamps after the request is built in case
2583          * we race with setattr (locally or in queue at OST).  If OST gets
2584          * later setattr before earlier BRW (as determined by the request xid),
2585          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2586          * way to do this in a single call.  bug 10150 */
2587         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2588         crattr->cra_oa = &body->oa;
2589         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2590         cl_req_attr_set(env, osc2cl(obj), crattr);
2591         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2592
2593         aa = ptlrpc_req_async_args(aa, req);
2594         INIT_LIST_HEAD(&aa->aa_oaps);
2595         list_splice_init(&rpc_list, &aa->aa_oaps);
2596         INIT_LIST_HEAD(&aa->aa_exts);
2597         list_splice_init(ext_list, &aa->aa_exts);
2598
2599         spin_lock(&cli->cl_loi_list_lock);
2600         starting_offset >>= PAGE_SHIFT;
2601         if (cmd == OBD_BRW_READ) {
2602                 cli->cl_r_in_flight++;
2603                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2604                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2605                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2606                                       starting_offset + 1);
2607         } else {
2608                 cli->cl_w_in_flight++;
2609                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2610                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2611                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2612                                       starting_offset + 1);
2613         }
2614         spin_unlock(&cli->cl_loi_list_lock);
2615
2616         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2617                   page_count, aa, cli->cl_r_in_flight,
2618                   cli->cl_w_in_flight);
2619         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2620
2621         ptlrpcd_add_req(req);
2622         rc = 0;
2623         EXIT;
2624
2625 out:
2626         if (mem_tight)
2627                 memalloc_noreclaim_restore(mpflag);
2628
2629         if (rc != 0) {
2630                 LASSERT(req == NULL);
2631
2632                 if (oa)
2633                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2634                 if (pga) {
2635                         osc_release_bounce_pages(pga, page_count);
2636                         osc_release_ppga(pga, page_count);
2637                 }
2638                 /* this should happen rarely and is pretty bad, it makes the
2639                  * pending list not follow the dirty order */
2640                 while (!list_empty(ext_list)) {
2641                         ext = list_entry(ext_list->next, struct osc_extent,
2642                                          oe_link);
2643                         list_del_init(&ext->oe_link);
2644                         osc_extent_finish(env, ext, 0, rc);
2645                 }
2646         }
2647         RETURN(rc);
2648 }
2649
2650 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2651 {
2652         int set = 0;
2653
2654         LASSERT(lock != NULL);
2655
2656         lock_res_and_lock(lock);
2657
2658         if (lock->l_ast_data == NULL)
2659                 lock->l_ast_data = data;
2660         if (lock->l_ast_data == data)
2661                 set = 1;
2662
2663         unlock_res_and_lock(lock);
2664
2665         return set;
2666 }
2667
2668 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2669                      void *cookie, struct lustre_handle *lockh,
2670                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2671                      int errcode)
2672 {
2673         bool intent = *flags & LDLM_FL_HAS_INTENT;
2674         int rc;
2675         ENTRY;
2676
2677         /* The request was created before ldlm_cli_enqueue call. */
2678         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2679                 struct ldlm_reply *rep;
2680
2681                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2682                 LASSERT(rep != NULL);
2683
2684                 rep->lock_policy_res1 =
2685                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2686                 if (rep->lock_policy_res1)
2687                         errcode = rep->lock_policy_res1;
2688                 if (!speculative)
2689                         *flags |= LDLM_FL_LVB_READY;
2690         } else if (errcode == ELDLM_OK) {
2691                 *flags |= LDLM_FL_LVB_READY;
2692         }
2693
2694         /* Call the update callback. */
2695         rc = (*upcall)(cookie, lockh, errcode);
2696
2697         /* release the reference taken in ldlm_cli_enqueue() */
2698         if (errcode == ELDLM_LOCK_MATCHED)
2699                 errcode = ELDLM_OK;
2700         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2701                 ldlm_lock_decref(lockh, mode);
2702
2703         RETURN(rc);
2704 }
2705
2706 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2707                           void *args, int rc)
2708 {
2709         struct osc_enqueue_args *aa = args;
2710         struct ldlm_lock *lock;
2711         struct lustre_handle *lockh = &aa->oa_lockh;
2712         enum ldlm_mode mode = aa->oa_mode;
2713         struct ost_lvb *lvb = aa->oa_lvb;
2714         __u32 lvb_len = sizeof(*lvb);
2715         __u64 flags = 0;
2716         struct ldlm_enqueue_info einfo = {
2717                 .ei_type = aa->oa_type,
2718                 .ei_mode = mode,
2719         };
2720
2721         ENTRY;
2722
2723         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2724          * be valid. */
2725         lock = ldlm_handle2lock(lockh);
2726         LASSERTF(lock != NULL,
2727                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2728                  lockh->cookie, req, aa);
2729
2730         /* Take an additional reference so that a blocking AST that
2731          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2732          * to arrive after an upcall has been executed by
2733          * osc_enqueue_fini(). */
2734         ldlm_lock_addref(lockh, mode);
2735
2736         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2737         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2738
2739         /* Let CP AST to grant the lock first. */
2740         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2741
2742         if (aa->oa_speculative) {
2743                 LASSERT(aa->oa_lvb == NULL);
2744                 LASSERT(aa->oa_flags == NULL);
2745                 aa->oa_flags = &flags;
2746         }
2747
2748         /* Complete obtaining the lock procedure. */
2749         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2750                                    lvb, lvb_len, lockh, rc);
2751         /* Complete osc stuff. */
2752         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2753                               aa->oa_flags, aa->oa_speculative, rc);
2754
2755         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2756
2757         ldlm_lock_decref(lockh, mode);
2758         LDLM_LOCK_PUT(lock);
2759         RETURN(rc);
2760 }
2761
2762 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2763  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2764  * other synchronous requests, however keeping some locks and trying to obtain
2765  * others may take a considerable amount of time in a case of ost failure; and
2766  * when other sync requests do not get released lock from a client, the client
2767  * is evicted from the cluster -- such scenarious make the life difficult, so
2768  * release locks just after they are obtained. */
2769 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2770                      __u64 *flags, union ldlm_policy_data *policy,
2771                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2772                      void *cookie, struct ldlm_enqueue_info *einfo,
2773                      struct ptlrpc_request_set *rqset, int async,
2774                      bool speculative)
2775 {
2776         struct obd_device *obd = exp->exp_obd;
2777         struct lustre_handle lockh = { 0 };
2778         struct ptlrpc_request *req = NULL;
2779         int intent = *flags & LDLM_FL_HAS_INTENT;
2780         __u64 match_flags = *flags;
2781         enum ldlm_mode mode;
2782         int rc;
2783         ENTRY;
2784
2785         /* Filesystem lock extents are extended to page boundaries so that
2786          * dealing with the page cache is a little smoother.  */
2787         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2788         policy->l_extent.end |= ~PAGE_MASK;
2789
2790         /* Next, search for already existing extent locks that will cover us */
2791         /* If we're trying to read, we also search for an existing PW lock.  The
2792          * VFS and page cache already protect us locally, so lots of readers/
2793          * writers can share a single PW lock.
2794          *
2795          * There are problems with conversion deadlocks, so instead of
2796          * converting a read lock to a write lock, we'll just enqueue a new
2797          * one.
2798          *
2799          * At some point we should cancel the read lock instead of making them
2800          * send us a blocking callback, but there are problems with canceling
2801          * locks out from other users right now, too. */
2802         mode = einfo->ei_mode;
2803         if (einfo->ei_mode == LCK_PR)
2804                 mode |= LCK_PW;
2805         /* Normal lock requests must wait for the LVB to be ready before
2806          * matching a lock; speculative lock requests do not need to,
2807          * because they will not actually use the lock. */
2808         if (!speculative)
2809                 match_flags |= LDLM_FL_LVB_READY;
2810         if (intent != 0)
2811                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2812         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2813                                einfo->ei_type, policy, mode, &lockh);
2814         if (mode) {
2815                 struct ldlm_lock *matched;
2816
2817                 if (*flags & LDLM_FL_TEST_LOCK)
2818                         RETURN(ELDLM_OK);
2819
2820                 matched = ldlm_handle2lock(&lockh);
2821                 if (speculative) {
2822                         /* This DLM lock request is speculative, and does not
2823                          * have an associated IO request. Therefore if there
2824                          * is already a DLM lock, it wll just inform the
2825                          * caller to cancel the request for this stripe.*/
2826                         lock_res_and_lock(matched);
2827                         if (ldlm_extent_equal(&policy->l_extent,
2828                             &matched->l_policy_data.l_extent))
2829                                 rc = -EEXIST;
2830                         else
2831                                 rc = -ECANCELED;
2832                         unlock_res_and_lock(matched);
2833
2834                         ldlm_lock_decref(&lockh, mode);
2835                         LDLM_LOCK_PUT(matched);
2836                         RETURN(rc);
2837                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2838                         *flags |= LDLM_FL_LVB_READY;
2839
2840                         /* We already have a lock, and it's referenced. */
2841                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2842
2843                         ldlm_lock_decref(&lockh, mode);
2844                         LDLM_LOCK_PUT(matched);
2845                         RETURN(ELDLM_OK);
2846                 } else {
2847                         ldlm_lock_decref(&lockh, mode);
2848                         LDLM_LOCK_PUT(matched);
2849                 }
2850         }
2851
2852         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2853                 RETURN(-ENOLCK);
2854
2855         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2856         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2857
2858         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2859                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2860         if (async) {
2861                 if (!rc) {
2862                         struct osc_enqueue_args *aa;
2863                         aa = ptlrpc_req_async_args(aa, req);
2864                         aa->oa_exp         = exp;
2865                         aa->oa_mode        = einfo->ei_mode;
2866                         aa->oa_type        = einfo->ei_type;
2867                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2868                         aa->oa_upcall      = upcall;
2869                         aa->oa_cookie      = cookie;
2870                         aa->oa_speculative = speculative;
2871                         if (!speculative) {
2872                                 aa->oa_flags  = flags;
2873                                 aa->oa_lvb    = lvb;
2874                         } else {
2875                                 /* speculative locks are essentially to enqueue
2876                                  * a DLM lock  in advance, so we don't care
2877                                  * about the result of the enqueue. */
2878                                 aa->oa_lvb    = NULL;
2879                                 aa->oa_flags  = NULL;
2880                         }
2881
2882                         req->rq_interpret_reply = osc_enqueue_interpret;
2883                         ptlrpc_set_add_req(rqset, req);
2884                 }
2885                 RETURN(rc);
2886         }
2887
2888         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2889                               flags, speculative, rc);
2890
2891         RETURN(rc);
2892 }
2893
2894 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2895                    struct ldlm_res_id *res_id, enum ldlm_type type,
2896                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2897                    __u64 *flags, struct osc_object *obj,
2898                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2899 {
2900         struct obd_device *obd = exp->exp_obd;
2901         __u64 lflags = *flags;
2902         enum ldlm_mode rc;
2903         ENTRY;
2904
2905         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2906                 RETURN(-EIO);
2907
2908         /* Filesystem lock extents are extended to page boundaries so that
2909          * dealing with the page cache is a little smoother */
2910         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2911         policy->l_extent.end |= ~PAGE_MASK;
2912
2913         /* Next, search for already existing extent locks that will cover us */
2914         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2915                                         res_id, type, policy, mode, lockh,
2916                                         match_flags);
2917         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2918                 RETURN(rc);
2919
2920         if (obj != NULL) {
2921                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2922
2923                 LASSERT(lock != NULL);
2924                 if (osc_set_lock_data(lock, obj)) {
2925                         lock_res_and_lock(lock);
2926                         if (!ldlm_is_lvb_cached(lock)) {
2927                                 LASSERT(lock->l_ast_data == obj);
2928                                 osc_lock_lvb_update(env, obj, lock, NULL);
2929                                 ldlm_set_lvb_cached(lock);
2930                         }
2931                         unlock_res_and_lock(lock);
2932                 } else {
2933                         ldlm_lock_decref(lockh, rc);
2934                         rc = 0;
2935                 }
2936                 LDLM_LOCK_PUT(lock);
2937         }
2938         RETURN(rc);
2939 }
2940
2941 static int osc_statfs_interpret(const struct lu_env *env,
2942                                 struct ptlrpc_request *req, void *args, int rc)
2943 {
2944         struct osc_async_args *aa = args;
2945         struct obd_statfs *msfs;
2946
2947         ENTRY;
2948         if (rc == -EBADR)
2949                 /*
2950                  * The request has in fact never been sent due to issues at
2951                  * a higher level (LOV).  Exit immediately since the caller
2952                  * is aware of the problem and takes care of the clean up.
2953                  */
2954                 RETURN(rc);
2955
2956         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2957             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2958                 GOTO(out, rc = 0);
2959
2960         if (rc != 0)
2961                 GOTO(out, rc);
2962
2963         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2964         if (msfs == NULL)
2965                 GOTO(out, rc = -EPROTO);
2966
2967         *aa->aa_oi->oi_osfs = *msfs;
2968 out:
2969         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2970
2971         RETURN(rc);
2972 }
2973
2974 static int osc_statfs_async(struct obd_export *exp,
2975                             struct obd_info *oinfo, time64_t max_age,
2976                             struct ptlrpc_request_set *rqset)
2977 {
2978         struct obd_device     *obd = class_exp2obd(exp);
2979         struct ptlrpc_request *req;
2980         struct osc_async_args *aa;
2981         int rc;
2982         ENTRY;
2983
2984         if (obd->obd_osfs_age >= max_age) {
2985                 CDEBUG(D_SUPER,
2986                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2987                        obd->obd_name, &obd->obd_osfs,
2988                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2989                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2990                 spin_lock(&obd->obd_osfs_lock);
2991                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2992                 spin_unlock(&obd->obd_osfs_lock);
2993                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2994                 if (oinfo->oi_cb_up)
2995                         oinfo->oi_cb_up(oinfo, 0);
2996
2997                 RETURN(0);
2998         }
2999
3000         /* We could possibly pass max_age in the request (as an absolute
3001          * timestamp or a "seconds.usec ago") so the target can avoid doing
3002          * extra calls into the filesystem if that isn't necessary (e.g.
3003          * during mount that would help a bit).  Having relative timestamps
3004          * is not so great if request processing is slow, while absolute
3005          * timestamps are not ideal because they need time synchronization. */
3006         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3007         if (req == NULL)
3008                 RETURN(-ENOMEM);
3009
3010         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3011         if (rc) {
3012                 ptlrpc_request_free(req);
3013                 RETURN(rc);
3014         }
3015         ptlrpc_request_set_replen(req);
3016         req->rq_request_portal = OST_CREATE_PORTAL;
3017         ptlrpc_at_set_req_timeout(req);
3018
3019         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3020                 /* procfs requests not want stat in wait for avoid deadlock */
3021                 req->rq_no_resend = 1;
3022                 req->rq_no_delay = 1;
3023         }
3024
3025         req->rq_interpret_reply = osc_statfs_interpret;
3026         aa = ptlrpc_req_async_args(aa, req);
3027         aa->aa_oi = oinfo;
3028
3029         ptlrpc_set_add_req(rqset, req);
3030         RETURN(0);
3031 }
3032
3033 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3034                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3035 {
3036         struct obd_device     *obd = class_exp2obd(exp);
3037         struct obd_statfs     *msfs;
3038         struct ptlrpc_request *req;
3039         struct obd_import     *imp = NULL;
3040         int rc;
3041         ENTRY;
3042
3043
3044         /*Since the request might also come from lprocfs, so we need
3045          *sync this with client_disconnect_export Bug15684*/
3046         down_read(&obd->u.cli.cl_sem);
3047         if (obd->u.cli.cl_import)
3048                 imp = class_import_get(obd->u.cli.cl_import);
3049         up_read(&obd->u.cli.cl_sem);
3050         if (!imp)
3051                 RETURN(-ENODEV);
3052
3053         /* We could possibly pass max_age in the request (as an absolute
3054          * timestamp or a "seconds.usec ago") so the target can avoid doing
3055          * extra calls into the filesystem if that isn't necessary (e.g.
3056          * during mount that would help a bit).  Having relative timestamps
3057          * is not so great if request processing is slow, while absolute
3058          * timestamps are not ideal because they need time synchronization. */
3059         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3060
3061         class_import_put(imp);
3062
3063         if (req == NULL)
3064                 RETURN(-ENOMEM);
3065
3066         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3067         if (rc) {
3068                 ptlrpc_request_free(req);
3069                 RETURN(rc);
3070         }
3071         ptlrpc_request_set_replen(req);
3072         req->rq_request_portal = OST_CREATE_PORTAL;
3073         ptlrpc_at_set_req_timeout(req);
3074
3075         if (flags & OBD_STATFS_NODELAY) {
3076                 /* procfs requests not want stat in wait for avoid deadlock */
3077                 req->rq_no_resend = 1;
3078                 req->rq_no_delay = 1;
3079         }
3080
3081         rc = ptlrpc_queue_wait(req);
3082         if (rc)
3083                 GOTO(out, rc);
3084
3085         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3086         if (msfs == NULL)
3087                 GOTO(out, rc = -EPROTO);
3088
3089         *osfs = *msfs;
3090
3091         EXIT;
3092 out:
3093         ptlrpc_req_finished(req);
3094         return rc;
3095 }
3096
3097 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3098                          void *karg, void __user *uarg)
3099 {
3100         struct obd_device *obd = exp->exp_obd;
3101         struct obd_ioctl_data *data = karg;
3102         int rc = 0;
3103
3104         ENTRY;
3105         if (!try_module_get(THIS_MODULE)) {
3106                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3107                        module_name(THIS_MODULE));
3108                 return -EINVAL;
3109         }
3110         switch (cmd) {
3111         case OBD_IOC_CLIENT_RECOVER:
3112                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3113                                            data->ioc_inlbuf1, 0);
3114                 if (rc > 0)
3115                         rc = 0;
3116                 break;
3117         case IOC_OSC_SET_ACTIVE:
3118                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3119                                               data->ioc_offset);
3120                 break;
3121         default:
3122                 rc = -ENOTTY;
3123                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3124                        obd->obd_name, cmd, current->comm, rc);
3125                 break;
3126         }
3127
3128         module_put(THIS_MODULE);
3129         return rc;
3130 }
3131
3132 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3133                        u32 keylen, void *key, u32 vallen, void *val,
3134                        struct ptlrpc_request_set *set)
3135 {
3136         struct ptlrpc_request *req;
3137         struct obd_device     *obd = exp->exp_obd;
3138         struct obd_import     *imp = class_exp2cliimp(exp);
3139         char                  *tmp;
3140         int                    rc;
3141         ENTRY;
3142
3143         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3144
3145         if (KEY_IS(KEY_CHECKSUM)) {
3146                 if (vallen != sizeof(int))
3147                         RETURN(-EINVAL);
3148                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3149                 RETURN(0);
3150         }
3151
3152         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3153                 sptlrpc_conf_client_adapt(obd);
3154                 RETURN(0);
3155         }
3156
3157         if (KEY_IS(KEY_FLUSH_CTX)) {
3158                 sptlrpc_import_flush_my_ctx(imp);
3159                 RETURN(0);
3160         }
3161
3162         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3163                 struct client_obd *cli = &obd->u.cli;
3164                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3165                 long target = *(long *)val;
3166
3167                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3168                 *(long *)val -= nr;
3169                 RETURN(0);
3170         }
3171
3172         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3173                 RETURN(-EINVAL);
3174
3175         /* We pass all other commands directly to OST. Since nobody calls osc
3176            methods directly and everybody is supposed to go through LOV, we
3177            assume lov checked invalid values for us.
3178            The only recognised values so far are evict_by_nid and mds_conn.
3179            Even if something bad goes through, we'd get a -EINVAL from OST
3180            anyway. */
3181
3182         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3183                                                 &RQF_OST_SET_GRANT_INFO :
3184                                                 &RQF_OBD_SET_INFO);
3185         if (req == NULL)
3186                 RETURN(-ENOMEM);
3187
3188         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3189                              RCL_CLIENT, keylen);
3190         if (!KEY_IS(KEY_GRANT_SHRINK))
3191                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3192                                      RCL_CLIENT, vallen);
3193         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3194         if (rc) {
3195                 ptlrpc_request_free(req);
3196                 RETURN(rc);
3197         }
3198
3199         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3200         memcpy(tmp, key, keylen);
3201         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3202                                                         &RMF_OST_BODY :
3203                                                         &RMF_SETINFO_VAL);
3204         memcpy(tmp, val, vallen);
3205
3206         if (KEY_IS(KEY_GRANT_SHRINK)) {
3207                 struct osc_grant_args *aa;
3208                 struct obdo *oa;
3209
3210                 aa = ptlrpc_req_async_args(aa, req);
3211                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3212                 if (!oa) {
3213                         ptlrpc_req_finished(req);
3214                         RETURN(-ENOMEM);
3215                 }
3216                 *oa = ((struct ost_body *)val)->oa;
3217                 aa->aa_oa = oa;
3218                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3219         }
3220
3221         ptlrpc_request_set_replen(req);
3222         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3223                 LASSERT(set != NULL);
3224                 ptlrpc_set_add_req(set, req);
3225                 ptlrpc_check_set(NULL, set);
3226         } else {
3227                 ptlrpcd_add_req(req);
3228         }
3229
3230         RETURN(0);
3231 }
3232 EXPORT_SYMBOL(osc_set_info_async);
3233
3234 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3235                   struct obd_device *obd, struct obd_uuid *cluuid,
3236                   struct obd_connect_data *data, void *localdata)
3237 {
3238         struct client_obd *cli = &obd->u.cli;
3239
3240         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3241                 long lost_grant;
3242                 long grant;
3243
3244                 spin_lock(&cli->cl_loi_list_lock);
3245                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3246                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3247                         /* restore ocd_grant_blkbits as client page bits */
3248                         data->ocd_grant_blkbits = PAGE_SHIFT;
3249                         grant += cli->cl_dirty_grant;
3250                 } else {
3251                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3252                 }
3253                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3254                 lost_grant = cli->cl_lost_grant;
3255                 cli->cl_lost_grant = 0;
3256                 spin_unlock(&cli->cl_loi_list_lock);
3257
3258                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3259                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3260                        data->ocd_version, data->ocd_grant, lost_grant);
3261         }
3262
3263         RETURN(0);
3264 }
3265 EXPORT_SYMBOL(osc_reconnect);
3266
3267 int osc_disconnect(struct obd_export *exp)
3268 {
3269         struct obd_device *obd = class_exp2obd(exp);
3270         int rc;
3271
3272         rc = client_disconnect_export(exp);
3273         /**
3274          * Initially we put del_shrink_grant before disconnect_export, but it
3275          * causes the following problem if setup (connect) and cleanup
3276          * (disconnect) are tangled together.
3277          *      connect p1                     disconnect p2
3278          *   ptlrpc_connect_import
3279          *     ...............               class_manual_cleanup
3280          *                                     osc_disconnect
3281          *                                     del_shrink_grant
3282          *   ptlrpc_connect_interrupt
3283          *     osc_init_grant
3284          *   add this client to shrink list
3285          *                                      cleanup_osc
3286          * Bang! grant shrink thread trigger the shrink. BUG18662
3287          */
3288         osc_del_grant_list(&obd->u.cli);
3289         return rc;
3290 }
3291 EXPORT_SYMBOL(osc_disconnect);
3292
3293 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3294                                  struct hlist_node *hnode, void *arg)
3295 {
3296         struct lu_env *env = arg;
3297         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3298         struct ldlm_lock *lock;
3299         struct osc_object *osc = NULL;
3300         ENTRY;
3301
3302         lock_res(res);
3303         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3304                 if (lock->l_ast_data != NULL && osc == NULL) {
3305                         osc = lock->l_ast_data;
3306                         cl_object_get(osc2cl(osc));
3307                 }
3308
3309                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3310                  * by the 2nd round of ldlm_namespace_clean() call in
3311                  * osc_import_event(). */
3312                 ldlm_clear_cleaned(lock);
3313         }
3314         unlock_res(res);
3315
3316         if (osc != NULL) {
3317                 osc_object_invalidate(env, osc);
3318                 cl_object_put(env, osc2cl(osc));
3319         }
3320
3321         RETURN(0);
3322 }
3323 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3324
3325 static int osc_import_event(struct obd_device *obd,
3326                             struct obd_import *imp,
3327                             enum obd_import_event event)
3328 {
3329         struct client_obd *cli;
3330         int rc = 0;
3331
3332         ENTRY;
3333         LASSERT(imp->imp_obd == obd);
3334
3335         switch (event) {
3336         case IMP_EVENT_DISCON: {
3337                 cli = &obd->u.cli;
3338                 spin_lock(&cli->cl_loi_list_lock);
3339                 cli->cl_avail_grant = 0;
3340                 cli->cl_lost_grant = 0;
3341                 spin_unlock(&cli->cl_loi_list_lock);
3342                 break;
3343         }
3344         case IMP_EVENT_INACTIVE: {
3345                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3346                 break;
3347         }
3348         case IMP_EVENT_INVALIDATE: {
3349                 struct ldlm_namespace *ns = obd->obd_namespace;
3350                 struct lu_env         *env;
3351                 __u16                  refcheck;
3352
3353                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3354
3355                 env = cl_env_get(&refcheck);
3356                 if (!IS_ERR(env)) {
3357                         osc_io_unplug(env, &obd->u.cli, NULL);
3358
3359                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3360                                                  osc_ldlm_resource_invalidate,
3361                                                  env, 0);
3362                         cl_env_put(env, &refcheck);
3363
3364                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3365                 } else
3366                         rc = PTR_ERR(env);
3367                 break;
3368         }
3369         case IMP_EVENT_ACTIVE: {
3370                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3371                 break;
3372         }
3373         case IMP_EVENT_OCD: {
3374                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3375
3376                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3377                         osc_init_grant(&obd->u.cli, ocd);
3378
3379                 /* See bug 7198 */
3380                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3381                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3382
3383                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3384                 break;
3385         }
3386         case IMP_EVENT_DEACTIVATE: {
3387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3388                 break;
3389         }
3390         case IMP_EVENT_ACTIVATE: {
3391                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3392                 break;
3393         }
3394         default:
3395                 CERROR("Unknown import event %d\n", event);
3396                 LBUG();
3397         }
3398         RETURN(rc);
3399 }
3400
3401 /**
3402  * Determine whether the lock can be canceled before replaying the lock
3403  * during recovery, see bug16774 for detailed information.
3404  *
3405  * \retval zero the lock can't be canceled
3406  * \retval other ok to cancel
3407  */
3408 static int osc_cancel_weight(struct ldlm_lock *lock)
3409 {
3410         /*
3411          * Cancel all unused and granted extent lock.
3412          */
3413         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3414             ldlm_is_granted(lock) &&
3415             osc_ldlm_weigh_ast(lock) == 0)
3416                 RETURN(1);
3417
3418         RETURN(0);
3419 }
3420
3421 static int brw_queue_work(const struct lu_env *env, void *data)
3422 {
3423         struct client_obd *cli = data;
3424
3425         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3426
3427         osc_io_unplug(env, cli, NULL);
3428         RETURN(0);
3429 }
3430
3431 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3432 {
3433         struct client_obd *cli = &obd->u.cli;
3434         void *handler;
3435         int rc;
3436
3437         ENTRY;
3438
3439         rc = ptlrpcd_addref();
3440         if (rc)
3441                 RETURN(rc);
3442
3443         rc = client_obd_setup(obd, lcfg);
3444         if (rc)
3445                 GOTO(out_ptlrpcd, rc);
3446
3447
3448         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3449         if (IS_ERR(handler))
3450                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3451         cli->cl_writeback_work = handler;
3452
3453         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3454         if (IS_ERR(handler))
3455                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3456         cli->cl_lru_work = handler;
3457
3458         rc = osc_quota_setup(obd);
3459         if (rc)
3460                 GOTO(out_ptlrpcd_work, rc);
3461
3462         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3463         osc_update_next_shrink(cli);
3464
3465         RETURN(rc);
3466
3467 out_ptlrpcd_work:
3468         if (cli->cl_writeback_work != NULL) {
3469                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3470                 cli->cl_writeback_work = NULL;
3471         }
3472         if (cli->cl_lru_work != NULL) {
3473                 ptlrpcd_destroy_work(cli->cl_lru_work);
3474                 cli->cl_lru_work = NULL;
3475         }
3476         client_obd_cleanup(obd);
3477 out_ptlrpcd:
3478         ptlrpcd_decref();
3479         RETURN(rc);
3480 }
3481 EXPORT_SYMBOL(osc_setup_common);
3482
3483 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3484 {
3485         struct client_obd *cli = &obd->u.cli;
3486         int                adding;
3487         int                added;
3488         int                req_count;
3489         int                rc;
3490
3491         ENTRY;
3492
3493         rc = osc_setup_common(obd, lcfg);
3494         if (rc < 0)
3495                 RETURN(rc);
3496
3497         rc = osc_tunables_init(obd);
3498         if (rc)
3499                 RETURN(rc);
3500
3501         /*
3502          * We try to control the total number of requests with a upper limit
3503          * osc_reqpool_maxreqcount. There might be some race which will cause
3504          * over-limit allocation, but it is fine.
3505          */
3506         req_count = atomic_read(&osc_pool_req_count);
3507         if (req_count < osc_reqpool_maxreqcount) {
3508                 adding = cli->cl_max_rpcs_in_flight + 2;
3509                 if (req_count + adding > osc_reqpool_maxreqcount)
3510                         adding = osc_reqpool_maxreqcount - req_count;
3511
3512                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3513                 atomic_add(added, &osc_pool_req_count);
3514         }
3515
3516         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3517
3518         spin_lock(&osc_shrink_lock);
3519         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3520         spin_unlock(&osc_shrink_lock);
3521         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3522         cli->cl_import->imp_idle_debug = D_HA;
3523
3524         RETURN(0);
3525 }
3526
3527 int osc_precleanup_common(struct obd_device *obd)
3528 {
3529         struct client_obd *cli = &obd->u.cli;
3530         ENTRY;
3531
3532         /* LU-464
3533          * for echo client, export may be on zombie list, wait for
3534          * zombie thread to cull it, because cli.cl_import will be
3535          * cleared in client_disconnect_export():
3536          *   class_export_destroy() -> obd_cleanup() ->
3537          *   echo_device_free() -> echo_client_cleanup() ->
3538          *   obd_disconnect() -> osc_disconnect() ->
3539          *   client_disconnect_export()
3540          */
3541         obd_zombie_barrier();
3542         if (cli->cl_writeback_work) {
3543                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3544                 cli->cl_writeback_work = NULL;
3545         }
3546
3547         if (cli->cl_lru_work) {
3548                 ptlrpcd_destroy_work(cli->cl_lru_work);
3549                 cli->cl_lru_work = NULL;
3550         }
3551
3552         obd_cleanup_client_import(obd);
3553         RETURN(0);
3554 }
3555 EXPORT_SYMBOL(osc_precleanup_common);
3556
3557 static int osc_precleanup(struct obd_device *obd)
3558 {
3559         ENTRY;
3560
3561         osc_precleanup_common(obd);
3562
3563         ptlrpc_lprocfs_unregister_obd(obd);
3564         RETURN(0);
3565 }
3566
3567 int osc_cleanup_common(struct obd_device *obd)
3568 {
3569         struct client_obd *cli = &obd->u.cli;
3570         int rc;
3571
3572         ENTRY;
3573
3574         spin_lock(&osc_shrink_lock);
3575         list_del(&cli->cl_shrink_list);
3576         spin_unlock(&osc_shrink_lock);
3577
3578         /* lru cleanup */
3579         if (cli->cl_cache != NULL) {
3580                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3581                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3582                 list_del_init(&cli->cl_lru_osc);
3583                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3584                 cli->cl_lru_left = NULL;
3585                 cl_cache_decref(cli->cl_cache);
3586                 cli->cl_cache = NULL;
3587         }
3588
3589         /* free memory of osc quota cache */
3590         osc_quota_cleanup(obd);
3591
3592         rc = client_obd_cleanup(obd);
3593
3594         ptlrpcd_decref();
3595         RETURN(rc);
3596 }
3597 EXPORT_SYMBOL(osc_cleanup_common);
3598
3599 static const struct obd_ops osc_obd_ops = {
3600         .o_owner                = THIS_MODULE,
3601         .o_setup                = osc_setup,
3602         .o_precleanup           = osc_precleanup,
3603         .o_cleanup              = osc_cleanup_common,
3604         .o_add_conn             = client_import_add_conn,
3605         .o_del_conn             = client_import_del_conn,
3606         .o_connect              = client_connect_import,
3607         .o_reconnect            = osc_reconnect,
3608         .o_disconnect           = osc_disconnect,
3609         .o_statfs               = osc_statfs,
3610         .o_statfs_async         = osc_statfs_async,
3611         .o_create               = osc_create,
3612         .o_destroy              = osc_destroy,
3613         .o_getattr              = osc_getattr,
3614         .o_setattr              = osc_setattr,
3615         .o_iocontrol            = osc_iocontrol,
3616         .o_set_info_async       = osc_set_info_async,
3617         .o_import_event         = osc_import_event,
3618         .o_quotactl             = osc_quotactl,
3619 };
3620
3621 static struct shrinker *osc_cache_shrinker;
3622 LIST_HEAD(osc_shrink_list);
3623 DEFINE_SPINLOCK(osc_shrink_lock);
3624
3625 #ifndef HAVE_SHRINKER_COUNT
3626 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3627 {
3628         struct shrink_control scv = {
3629                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3630                 .gfp_mask   = shrink_param(sc, gfp_mask)
3631         };
3632         (void)osc_cache_shrink_scan(shrinker, &scv);
3633
3634         return osc_cache_shrink_count(shrinker, &scv);
3635 }
3636 #endif
3637
3638 static int __init osc_init(void)
3639 {
3640         unsigned int reqpool_size;
3641         unsigned int reqsize;
3642         int rc;
3643         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3644                          osc_cache_shrink_count, osc_cache_shrink_scan);
3645         ENTRY;
3646
3647         /* print an address of _any_ initialized kernel symbol from this
3648          * module, to allow debugging with gdb that doesn't support data
3649          * symbols from modules.*/
3650         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3651
3652         rc = lu_kmem_init(osc_caches);
3653         if (rc)
3654                 RETURN(rc);
3655
3656         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3657                                  LUSTRE_OSC_NAME, &osc_device_type);
3658         if (rc)
3659                 GOTO(out_kmem, rc);
3660
3661         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3662
3663         /* This is obviously too much memory, only prevent overflow here */
3664         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3665                 GOTO(out_type, rc = -EINVAL);
3666
3667         reqpool_size = osc_reqpool_mem_max << 20;
3668
3669         reqsize = 1;
3670         while (reqsize < OST_IO_MAXREQSIZE)
3671                 reqsize = reqsize << 1;
3672
3673         /*
3674          * We don't enlarge the request count in OSC pool according to
3675          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3676          * tried after normal allocation failed. So a small OSC pool won't
3677          * cause much performance degression in most of cases.
3678          */
3679         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3680
3681         atomic_set(&osc_pool_req_count, 0);
3682         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3683                                           ptlrpc_add_rqs_to_pool);
3684
3685         if (osc_rq_pool == NULL)
3686                 GOTO(out_type, rc = -ENOMEM);
3687
3688         rc = osc_start_grant_work();
3689         if (rc != 0)
3690                 GOTO(out_req_pool, rc);
3691
3692         RETURN(rc);
3693
3694 out_req_pool:
3695         ptlrpc_free_rq_pool(osc_rq_pool);
3696 out_type:
3697         class_unregister_type(LUSTRE_OSC_NAME);
3698 out_kmem:
3699         lu_kmem_fini(osc_caches);
3700
3701         RETURN(rc);
3702 }
3703
3704 static void __exit osc_exit(void)
3705 {
3706         osc_stop_grant_work();
3707         remove_shrinker(osc_cache_shrinker);
3708         class_unregister_type(LUSTRE_OSC_NAME);
3709         lu_kmem_fini(osc_caches);
3710         ptlrpc_free_rq_pool(osc_rq_pool);
3711 }
3712
3713 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3714 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3715 MODULE_VERSION(LUSTRE_VERSION_STRING);
3716 MODULE_LICENSE("GPL");
3717
3718 module_init(osc_init);
3719 module_exit(osc_exit);