Whamcloud - gitweb
LU-12511 lov: use lov_pattern_support() to verify lmm
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 cli->cl_avail_grant -= cli->cl_reserved_grant;
1027                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1028                         cli->cl_avail_grant -= cli->cl_dirty_grant;
1029                 else
1030                         cli->cl_avail_grant -=
1031                                         cli->cl_dirty_pages << PAGE_SHIFT;
1032         }
1033
1034         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1035                 u64 size;
1036                 int chunk_mask;
1037
1038                 /* overhead for each extent insertion */
1039                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1040                 /* determine the appropriate chunk size used by osc_extent. */
1041                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1042                                           ocd->ocd_grant_blkbits);
1043                 /* max_pages_per_rpc must be chunk aligned */
1044                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1045                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1046                                              ~chunk_mask) & chunk_mask;
1047                 /* determine maximum extent size, in #pages */
1048                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1049                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1050                 if (cli->cl_max_extent_pages == 0)
1051                         cli->cl_max_extent_pages = 1;
1052         } else {
1053                 cli->cl_grant_extent_tax = 0;
1054                 cli->cl_chunkbits = PAGE_SHIFT;
1055                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1056         }
1057         spin_unlock(&cli->cl_loi_list_lock);
1058
1059         CDEBUG(D_CACHE,
1060                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1061                cli_name(cli),
1062                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1063                cli->cl_max_extent_pages);
1064
1065         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1066                 osc_add_grant_list(cli);
1067 }
1068 EXPORT_SYMBOL(osc_init_grant);
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, size_t page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            size_t page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0) {
1127                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1128                                i, remote_rcs[i], req);
1129                         return remote_rcs[i];
1130                 }
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138         if (req->rq_bulk != NULL &&
1139             req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1169 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1170                                    size_t pg_count, struct brw_page **pga,
1171                                    int opc, obd_dif_csum_fn *fn,
1172                                    int sector_size,
1173                                    u32 *check_sum)
1174 {
1175         struct ahash_request *req;
1176         /* Used Adler as the default checksum type on top of DIF tags */
1177         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1178         struct page *__page;
1179         unsigned char *buffer;
1180         __u16 *guard_start;
1181         unsigned int bufsize;
1182         int guard_number;
1183         int used_number = 0;
1184         int used;
1185         u32 cksum;
1186         int rc = 0;
1187         int i = 0;
1188
1189         LASSERT(pg_count > 0);
1190
1191         __page = alloc_page(GFP_KERNEL);
1192         if (__page == NULL)
1193                 return -ENOMEM;
1194
1195         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(req)) {
1197                 rc = PTR_ERR(req);
1198                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1199                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1200                 GOTO(out, rc);
1201         }
1202
1203         buffer = kmap(__page);
1204         guard_start = (__u16 *)buffer;
1205         guard_number = PAGE_SIZE / sizeof(*guard_start);
1206         while (nob > 0 && pg_count > 0) {
1207                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (unlikely(i == 0 && opc == OST_READ &&
1212                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1213                         unsigned char *ptr = kmap(pga[i]->pg);
1214                         int off = pga[i]->off & ~PAGE_MASK;
1215
1216                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1217                         kunmap(pga[i]->pg);
1218                 }
1219
1220                 /*
1221                  * The left guard number should be able to hold checksums of a
1222                  * whole page
1223                  */
1224                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1225                                                   pga[i]->off & ~PAGE_MASK,
1226                                                   count,
1227                                                   guard_start + used_number,
1228                                                   guard_number - used_number,
1229                                                   &used, sector_size,
1230                                                   fn);
1231                 if (rc)
1232                         break;
1233
1234                 used_number += used;
1235                 if (used_number == guard_number) {
1236                         cfs_crypto_hash_update_page(req, __page, 0,
1237                                 used_number * sizeof(*guard_start));
1238                         used_number = 0;
1239                 }
1240
1241                 nob -= pga[i]->count;
1242                 pg_count--;
1243                 i++;
1244         }
1245         kunmap(__page);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         if (used_number != 0)
1250                 cfs_crypto_hash_update_page(req, __page, 0,
1251                         used_number * sizeof(*guard_start));
1252
1253         bufsize = sizeof(cksum);
1254         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1255
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         *check_sum = cksum;
1262 out:
1263         __free_page(__page);
1264         return rc;
1265 }
1266 #else /* !CONFIG_CRC_T10DIF */
1267 #define obd_dif_ip_fn NULL
1268 #define obd_dif_crc_fn NULL
1269 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1270         -EOPNOTSUPP
1271 #endif /* CONFIG_CRC_T10DIF */
1272
1273 static int osc_checksum_bulk(int nob, size_t pg_count,
1274                              struct brw_page **pga, int opc,
1275                              enum cksum_types cksum_type,
1276                              u32 *cksum)
1277 {
1278         int                             i = 0;
1279         struct ahash_request           *req;
1280         unsigned int                    bufsize;
1281         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1282
1283         LASSERT(pg_count > 0);
1284
1285         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1286         if (IS_ERR(req)) {
1287                 CERROR("Unable to initialize checksum hash %s\n",
1288                        cfs_crypto_hash_name(cfs_alg));
1289                 return PTR_ERR(req);
1290         }
1291
1292         while (nob > 0 && pg_count > 0) {
1293                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1294
1295                 /* corrupt the data before we compute the checksum, to
1296                  * simulate an OST->client data error */
1297                 if (i == 0 && opc == OST_READ &&
1298                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1299                         unsigned char *ptr = kmap(pga[i]->pg);
1300                         int off = pga[i]->off & ~PAGE_MASK;
1301
1302                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1303                         kunmap(pga[i]->pg);
1304                 }
1305                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1306                                             pga[i]->off & ~PAGE_MASK,
1307                                             count);
1308                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1309                                (int)(pga[i]->off & ~PAGE_MASK));
1310
1311                 nob -= pga[i]->count;
1312                 pg_count--;
1313                 i++;
1314         }
1315
1316         bufsize = sizeof(*cksum);
1317         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1318
1319         /* For sending we only compute the wrong checksum instead
1320          * of corrupting the data so it is still correct on a redo */
1321         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1322                 (*cksum)++;
1323
1324         return 0;
1325 }
1326
1327 static int osc_checksum_bulk_rw(const char *obd_name,
1328                                 enum cksum_types cksum_type,
1329                                 int nob, size_t pg_count,
1330                                 struct brw_page **pga, int opc,
1331                                 u32 *check_sum)
1332 {
1333         obd_dif_csum_fn *fn = NULL;
1334         int sector_size = 0;
1335         int rc;
1336
1337         ENTRY;
1338         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1339
1340         if (fn)
1341                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1342                                              opc, fn, sector_size, check_sum);
1343         else
1344                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1345                                        check_sum);
1346
1347         RETURN(rc);
1348 }
1349
1350 static inline void osc_release_bounce_pages(struct brw_page **pga,
1351                                             u32 page_count)
1352 {
1353 #ifdef HAVE_LUSTRE_CRYPTO
1354         int i;
1355
1356         for (i = 0; i < page_count; i++) {
1357                 if (pga[i]->pg->mapping)
1358                         /* bounce pages are unmapped */
1359                         continue;
1360                 if (pga[i]->flag & OBD_BRW_SYNC)
1361                         /* sync transfer cannot have encrypted pages */
1362                         continue;
1363                 llcrypt_finalize_bounce_page(&pga[i]->pg);
1364                 pga[i]->count -= pga[i]->bp_count_diff;
1365                 pga[i]->off += pga[i]->bp_off_diff;
1366         }
1367 #endif
1368 }
1369
1370 static int
1371 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1372                      u32 page_count, struct brw_page **pga,
1373                      struct ptlrpc_request **reqp, int resend)
1374 {
1375         struct ptlrpc_request *req;
1376         struct ptlrpc_bulk_desc *desc;
1377         struct ost_body *body;
1378         struct obd_ioobj *ioobj;
1379         struct niobuf_remote *niobuf;
1380         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1381         struct osc_brw_async_args *aa;
1382         struct req_capsule *pill;
1383         struct brw_page *pg_prev;
1384         void *short_io_buf;
1385         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1386         struct inode *inode;
1387
1388         ENTRY;
1389         inode = page2inode(pga[0]->pg);
1390         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1391                 RETURN(-ENOMEM); /* Recoverable */
1392         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1393                 RETURN(-EINVAL); /* Fatal */
1394
1395         if ((cmd & OBD_BRW_WRITE) != 0) {
1396                 opc = OST_WRITE;
1397                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1398                                                 osc_rq_pool,
1399                                                 &RQF_OST_BRW_WRITE);
1400         } else {
1401                 opc = OST_READ;
1402                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1403         }
1404         if (req == NULL)
1405                 RETURN(-ENOMEM);
1406
1407         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1408                 for (i = 0; i < page_count; i++) {
1409                         struct brw_page *pg = pga[i];
1410                         struct page *data_page = NULL;
1411                         bool retried = false;
1412                         bool lockedbymyself;
1413
1414 retry_encrypt:
1415                         /* The page can already be locked when we arrive here.
1416                          * This is possible when cl_page_assume/vvp_page_assume
1417                          * is stuck on wait_on_page_writeback with page lock
1418                          * held. In this case there is no risk for the lock to
1419                          * be released while we are doing our encryption
1420                          * processing, because writeback against that page will
1421                          * end in vvp_page_completion_write/cl_page_completion,
1422                          * which means only once the page is fully processed.
1423                          */
1424                         lockedbymyself = trylock_page(pg->pg);
1425                         data_page =
1426                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1427                                                                  PAGE_SIZE, 0,
1428                                                                  GFP_NOFS);
1429                         if (lockedbymyself)
1430                                 unlock_page(pg->pg);
1431                         if (IS_ERR(data_page)) {
1432                                 rc = PTR_ERR(data_page);
1433                                 if (rc == -ENOMEM && !retried) {
1434                                         retried = true;
1435                                         rc = 0;
1436                                         goto retry_encrypt;
1437                                 }
1438                                 ptlrpc_request_free(req);
1439                                 RETURN(rc);
1440                         }
1441                         pg->pg = data_page;
1442                         /* there should be no gap in the middle of page array */
1443                         if (i == page_count - 1) {
1444                                 struct osc_async_page *oap = brw_page2oap(pg);
1445
1446                                 oa->o_size = oap->oap_count +
1447                                         oap->oap_obj_off + oap->oap_page_off;
1448                         }
1449                         /* len is forced to PAGE_SIZE, and poff to 0
1450                          * so store the old, clear text info
1451                          */
1452                         pg->bp_count_diff = PAGE_SIZE - pg->count;
1453                         pg->count = PAGE_SIZE;
1454                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1455                         pg->off = pg->off & PAGE_MASK;
1456                 }
1457         }
1458
1459         for (niocount = i = 1; i < page_count; i++) {
1460                 if (!can_merge_pages(pga[i - 1], pga[i]))
1461                         niocount++;
1462         }
1463
1464         pill = &req->rq_pill;
1465         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1466                              sizeof(*ioobj));
1467         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1468                              niocount * sizeof(*niobuf));
1469
1470         for (i = 0; i < page_count; i++)
1471                 short_io_size += pga[i]->count;
1472
1473         /* Check if read/write is small enough to be a short io. */
1474         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1475             !imp_connect_shortio(cli->cl_import))
1476                 short_io_size = 0;
1477
1478         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1479                              opc == OST_READ ? 0 : short_io_size);
1480         if (opc == OST_READ)
1481                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1482                                      short_io_size);
1483
1484         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1485         if (rc) {
1486                 ptlrpc_request_free(req);
1487                 RETURN(rc);
1488         }
1489         osc_set_io_portal(req);
1490
1491         ptlrpc_at_set_req_timeout(req);
1492         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1493          * retry logic */
1494         req->rq_no_retry_einprogress = 1;
1495
1496         if (short_io_size != 0) {
1497                 desc = NULL;
1498                 short_io_buf = NULL;
1499                 goto no_bulk;
1500         }
1501
1502         desc = ptlrpc_prep_bulk_imp(req, page_count,
1503                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1504                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1505                         PTLRPC_BULK_PUT_SINK),
1506                 OST_BULK_PORTAL,
1507                 &ptlrpc_bulk_kiov_pin_ops);
1508
1509         if (desc == NULL)
1510                 GOTO(out, rc = -ENOMEM);
1511         /* NB request now owns desc and will free it when it gets freed */
1512 no_bulk:
1513         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1514         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1515         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1516         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1517
1518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1519
1520         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1521          * and from_kgid(), because they are asynchronous. Fortunately, variable
1522          * oa contains valid o_uid and o_gid in these two operations.
1523          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1524          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1525          * other process logic */
1526         body->oa.o_uid = oa->o_uid;
1527         body->oa.o_gid = oa->o_gid;
1528
1529         obdo_to_ioobj(oa, ioobj);
1530         ioobj->ioo_bufcnt = niocount;
1531         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1532          * that might be send for this request.  The actual number is decided
1533          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1534          * "max - 1" for old client compatibility sending "0", and also so the
1535          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1536         if (desc != NULL)
1537                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1538         else /* short io */
1539                 ioobj_max_brw_set(ioobj, 0);
1540
1541         if (short_io_size != 0) {
1542                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1543                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1544                         body->oa.o_flags = 0;
1545                 }
1546                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1547                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1548                        short_io_size);
1549                 if (opc == OST_WRITE) {
1550                         short_io_buf = req_capsule_client_get(pill,
1551                                                               &RMF_SHORT_IO);
1552                         LASSERT(short_io_buf != NULL);
1553                 }
1554         }
1555
1556         LASSERT(page_count > 0);
1557         pg_prev = pga[0];
1558         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1559                 struct brw_page *pg = pga[i];
1560                 int poff = pg->off & ~PAGE_MASK;
1561
1562                 LASSERT(pg->count > 0);
1563                 /* make sure there is no gap in the middle of page array */
1564                 LASSERTF(page_count == 1 ||
1565                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1566                           ergo(i > 0 && i < page_count - 1,
1567                                poff == 0 && pg->count == PAGE_SIZE)   &&
1568                           ergo(i == page_count - 1, poff == 0)),
1569                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1570                          i, page_count, pg, pg->off, pg->count);
1571                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1572                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1573                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1574                          i, page_count,
1575                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1576                          pg_prev->pg, page_private(pg_prev->pg),
1577                          pg_prev->pg->index, pg_prev->off);
1578                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1579                         (pg->flag & OBD_BRW_SRVLOCK));
1580                 if (short_io_size != 0 && opc == OST_WRITE) {
1581                         unsigned char *ptr = kmap_atomic(pg->pg);
1582
1583                         LASSERT(short_io_size >= requested_nob + pg->count);
1584                         memcpy(short_io_buf + requested_nob,
1585                                ptr + poff,
1586                                pg->count);
1587                         kunmap_atomic(ptr);
1588                 } else if (short_io_size == 0) {
1589                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1590                                                          pg->count);
1591                 }
1592                 requested_nob += pg->count;
1593
1594                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1595                         niobuf--;
1596                         niobuf->rnb_len += pg->count;
1597                 } else {
1598                         niobuf->rnb_offset = pg->off;
1599                         niobuf->rnb_len    = pg->count;
1600                         niobuf->rnb_flags  = pg->flag;
1601                 }
1602                 pg_prev = pg;
1603         }
1604
1605         LASSERTF((void *)(niobuf - niocount) ==
1606                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1607                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1608                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1609
1610         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1611         if (resend) {
1612                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1613                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1614                         body->oa.o_flags = 0;
1615                 }
1616                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1617         }
1618
1619         if (osc_should_shrink_grant(cli))
1620                 osc_shrink_grant_local(cli, &body->oa);
1621
1622         /* size[REQ_REC_OFF] still sizeof (*body) */
1623         if (opc == OST_WRITE) {
1624                 if (cli->cl_checksum &&
1625                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1626                         /* store cl_cksum_type in a local variable since
1627                          * it can be changed via lprocfs */
1628                         enum cksum_types cksum_type = cli->cl_cksum_type;
1629
1630                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1631                                 body->oa.o_flags = 0;
1632
1633                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1634                                                                 cksum_type);
1635                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1636
1637                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1638                                                   requested_nob, page_count,
1639                                                   pga, OST_WRITE,
1640                                                   &body->oa.o_cksum);
1641                         if (rc < 0) {
1642                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1643                                        rc);
1644                                 GOTO(out, rc);
1645                         }
1646                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1647                                body->oa.o_cksum);
1648
1649                         /* save this in 'oa', too, for later checking */
1650                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1651                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1652                                                            cksum_type);
1653                 } else {
1654                         /* clear out the checksum flag, in case this is a
1655                          * resend but cl_checksum is no longer set. b=11238 */
1656                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1657                 }
1658                 oa->o_cksum = body->oa.o_cksum;
1659                 /* 1 RC per niobuf */
1660                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1661                                      sizeof(__u32) * niocount);
1662         } else {
1663                 if (cli->cl_checksum &&
1664                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1665                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1666                                 body->oa.o_flags = 0;
1667                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1668                                 cli->cl_cksum_type);
1669                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1670                 }
1671
1672                 /* Client cksum has been already copied to wire obdo in previous
1673                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1674                  * resent due to cksum error, this will allow Server to
1675                  * check+dump pages on its side */
1676         }
1677         ptlrpc_request_set_replen(req);
1678
1679         aa = ptlrpc_req_async_args(aa, req);
1680         aa->aa_oa = oa;
1681         aa->aa_requested_nob = requested_nob;
1682         aa->aa_nio_count = niocount;
1683         aa->aa_page_count = page_count;
1684         aa->aa_resends = 0;
1685         aa->aa_ppga = pga;
1686         aa->aa_cli = cli;
1687         INIT_LIST_HEAD(&aa->aa_oaps);
1688
1689         *reqp = req;
1690         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1691         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1692                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1693                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1694         RETURN(0);
1695
1696  out:
1697         ptlrpc_req_finished(req);
1698         RETURN(rc);
1699 }
1700
1701 char dbgcksum_file_name[PATH_MAX];
1702
1703 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1704                                 struct brw_page **pga, __u32 server_cksum,
1705                                 __u32 client_cksum)
1706 {
1707         struct file *filp;
1708         int rc, i;
1709         unsigned int len;
1710         char *buf;
1711
1712         /* will only keep dump of pages on first error for the same range in
1713          * file/fid, not during the resends/retries. */
1714         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1715                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1716                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1717                   libcfs_debug_file_path_arr :
1718                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1719                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1720                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1721                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1722                  pga[0]->off,
1723                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1724                  client_cksum, server_cksum);
1725         filp = filp_open(dbgcksum_file_name,
1726                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1727         if (IS_ERR(filp)) {
1728                 rc = PTR_ERR(filp);
1729                 if (rc == -EEXIST)
1730                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1731                                "checksum error: rc = %d\n", dbgcksum_file_name,
1732                                rc);
1733                 else
1734                         CERROR("%s: can't open to dump pages with checksum "
1735                                "error: rc = %d\n", dbgcksum_file_name, rc);
1736                 return;
1737         }
1738
1739         for (i = 0; i < page_count; i++) {
1740                 len = pga[i]->count;
1741                 buf = kmap(pga[i]->pg);
1742                 while (len != 0) {
1743                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1744                         if (rc < 0) {
1745                                 CERROR("%s: wanted to write %u but got %d "
1746                                        "error\n", dbgcksum_file_name, len, rc);
1747                                 break;
1748                         }
1749                         len -= rc;
1750                         buf += rc;
1751                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1752                                dbgcksum_file_name, rc);
1753                 }
1754                 kunmap(pga[i]->pg);
1755         }
1756
1757         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1758         if (rc)
1759                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1760         filp_close(filp, NULL);
1761 }
1762
1763 static int
1764 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1765                      __u32 client_cksum, __u32 server_cksum,
1766                      struct osc_brw_async_args *aa)
1767 {
1768         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1769         enum cksum_types cksum_type;
1770         obd_dif_csum_fn *fn = NULL;
1771         int sector_size = 0;
1772         __u32 new_cksum;
1773         char *msg;
1774         int rc;
1775
1776         if (server_cksum == client_cksum) {
1777                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1778                 return 0;
1779         }
1780
1781         if (aa->aa_cli->cl_checksum_dump)
1782                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1783                                     server_cksum, client_cksum);
1784
1785         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1786                                            oa->o_flags : 0);
1787
1788         switch (cksum_type) {
1789         case OBD_CKSUM_T10IP512:
1790                 fn = obd_dif_ip_fn;
1791                 sector_size = 512;
1792                 break;
1793         case OBD_CKSUM_T10IP4K:
1794                 fn = obd_dif_ip_fn;
1795                 sector_size = 4096;
1796                 break;
1797         case OBD_CKSUM_T10CRC512:
1798                 fn = obd_dif_crc_fn;
1799                 sector_size = 512;
1800                 break;
1801         case OBD_CKSUM_T10CRC4K:
1802                 fn = obd_dif_crc_fn;
1803                 sector_size = 4096;
1804                 break;
1805         default:
1806                 break;
1807         }
1808
1809         if (fn)
1810                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1811                                              aa->aa_page_count, aa->aa_ppga,
1812                                              OST_WRITE, fn, sector_size,
1813                                              &new_cksum);
1814         else
1815                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1816                                        aa->aa_ppga, OST_WRITE, cksum_type,
1817                                        &new_cksum);
1818
1819         if (rc < 0)
1820                 msg = "failed to calculate the client write checksum";
1821         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1822                 msg = "the server did not use the checksum type specified in "
1823                       "the original request - likely a protocol problem";
1824         else if (new_cksum == server_cksum)
1825                 msg = "changed on the client after we checksummed it - "
1826                       "likely false positive due to mmap IO (bug 11742)";
1827         else if (new_cksum == client_cksum)
1828                 msg = "changed in transit before arrival at OST";
1829         else
1830                 msg = "changed in transit AND doesn't match the original - "
1831                       "likely false positive due to mmap IO (bug 11742)";
1832
1833         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1834                            DFID " object "DOSTID" extent [%llu-%llu], original "
1835                            "client csum %x (type %x), server csum %x (type %x),"
1836                            " client csum now %x\n",
1837                            obd_name, msg, libcfs_nid2str(peer->nid),
1838                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1839                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1840                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1841                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1842                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1843                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1844                            client_cksum,
1845                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1846                            server_cksum, cksum_type, new_cksum);
1847         return 1;
1848 }
1849
1850 /* Note rc enters this function as number of bytes transferred */
1851 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1852 {
1853         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1854         struct client_obd *cli = aa->aa_cli;
1855         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1856         const struct lnet_process_id *peer =
1857                 &req->rq_import->imp_connection->c_peer;
1858         struct ost_body *body;
1859         u32 client_cksum = 0;
1860         struct inode *inode;
1861
1862         ENTRY;
1863
1864         if (rc < 0 && rc != -EDQUOT) {
1865                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1866                 RETURN(rc);
1867         }
1868
1869         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1870         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1871         if (body == NULL) {
1872                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1873                 RETURN(-EPROTO);
1874         }
1875
1876         /* set/clear over quota flag for a uid/gid/projid */
1877         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1878             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1879                 unsigned qid[LL_MAXQUOTAS] = {
1880                                          body->oa.o_uid, body->oa.o_gid,
1881                                          body->oa.o_projid };
1882                 CDEBUG(D_QUOTA,
1883                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1884                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1885                        body->oa.o_valid, body->oa.o_flags);
1886                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1887                                        body->oa.o_flags);
1888         }
1889
1890         osc_update_grant(cli, body);
1891
1892         if (rc < 0)
1893                 RETURN(rc);
1894
1895         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1896                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1897
1898         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1899                 if (rc > 0) {
1900                         CERROR("%s: unexpected positive size %d\n",
1901                                obd_name, rc);
1902                         RETURN(-EPROTO);
1903                 }
1904
1905                 if (req->rq_bulk != NULL &&
1906                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1907                         RETURN(-EAGAIN);
1908
1909                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1910                     check_write_checksum(&body->oa, peer, client_cksum,
1911                                          body->oa.o_cksum, aa))
1912                         RETURN(-EAGAIN);
1913
1914                 rc = check_write_rcs(req, aa->aa_requested_nob,
1915                                      aa->aa_nio_count, aa->aa_page_count,
1916                                      aa->aa_ppga);
1917                 GOTO(out, rc);
1918         }
1919
1920         /* The rest of this function executes only for OST_READs */
1921
1922         if (req->rq_bulk == NULL) {
1923                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1924                                           RCL_SERVER);
1925                 LASSERT(rc == req->rq_status);
1926         } else {
1927                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1928                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1929         }
1930         if (rc < 0)
1931                 GOTO(out, rc = -EAGAIN);
1932
1933         if (rc > aa->aa_requested_nob) {
1934                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1935                        rc, aa->aa_requested_nob);
1936                 RETURN(-EPROTO);
1937         }
1938
1939         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1940                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1941                        rc, req->rq_bulk->bd_nob_transferred);
1942                 RETURN(-EPROTO);
1943         }
1944
1945         if (req->rq_bulk == NULL) {
1946                 /* short io */
1947                 int nob, pg_count, i = 0;
1948                 unsigned char *buf;
1949
1950                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1951                 pg_count = aa->aa_page_count;
1952                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1953                                                    rc);
1954                 nob = rc;
1955                 while (nob > 0 && pg_count > 0) {
1956                         unsigned char *ptr;
1957                         int count = aa->aa_ppga[i]->count > nob ?
1958                                     nob : aa->aa_ppga[i]->count;
1959
1960                         CDEBUG(D_CACHE, "page %p count %d\n",
1961                                aa->aa_ppga[i]->pg, count);
1962                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1963                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1964                                count);
1965                         kunmap_atomic((void *) ptr);
1966
1967                         buf += count;
1968                         nob -= count;
1969                         i++;
1970                         pg_count--;
1971                 }
1972         }
1973
1974         if (rc < aa->aa_requested_nob)
1975                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1976
1977         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1978                 static int cksum_counter;
1979                 u32        server_cksum = body->oa.o_cksum;
1980                 char      *via = "";
1981                 char      *router = "";
1982                 enum cksum_types cksum_type;
1983                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1984                         body->oa.o_flags : 0;
1985
1986                 cksum_type = obd_cksum_type_unpack(o_flags);
1987                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1988                                           aa->aa_page_count, aa->aa_ppga,
1989                                           OST_READ, &client_cksum);
1990                 if (rc < 0)
1991                         GOTO(out, rc);
1992
1993                 if (req->rq_bulk != NULL &&
1994                     peer->nid != req->rq_bulk->bd_sender) {
1995                         via = " via ";
1996                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1997                 }
1998
1999                 if (server_cksum != client_cksum) {
2000                         struct ost_body *clbody;
2001                         u32 page_count = aa->aa_page_count;
2002
2003                         clbody = req_capsule_client_get(&req->rq_pill,
2004                                                         &RMF_OST_BODY);
2005                         if (cli->cl_checksum_dump)
2006                                 dump_all_bulk_pages(&clbody->oa, page_count,
2007                                                     aa->aa_ppga, server_cksum,
2008                                                     client_cksum);
2009
2010                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2011                                            "%s%s%s inode "DFID" object "DOSTID
2012                                            " extent [%llu-%llu], client %x, "
2013                                            "server %x, cksum_type %x\n",
2014                                            obd_name,
2015                                            libcfs_nid2str(peer->nid),
2016                                            via, router,
2017                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2018                                                 clbody->oa.o_parent_seq : 0ULL,
2019                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2020                                                 clbody->oa.o_parent_oid : 0,
2021                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2022                                                 clbody->oa.o_parent_ver : 0,
2023                                            POSTID(&body->oa.o_oi),
2024                                            aa->aa_ppga[0]->off,
2025                                            aa->aa_ppga[page_count-1]->off +
2026                                            aa->aa_ppga[page_count-1]->count - 1,
2027                                            client_cksum, server_cksum,
2028                                            cksum_type);
2029                         cksum_counter = 0;
2030                         aa->aa_oa->o_cksum = client_cksum;
2031                         rc = -EAGAIN;
2032                 } else {
2033                         cksum_counter++;
2034                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2035                         rc = 0;
2036                 }
2037         } else if (unlikely(client_cksum)) {
2038                 static int cksum_missed;
2039
2040                 cksum_missed++;
2041                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2042                         CERROR("%s: checksum %u requested from %s but not sent\n",
2043                                obd_name, cksum_missed,
2044                                libcfs_nid2str(peer->nid));
2045         } else {
2046                 rc = 0;
2047         }
2048
2049         inode = page2inode(aa->aa_ppga[0]->pg);
2050         if (inode && IS_ENCRYPTED(inode)) {
2051                 int idx;
2052
2053                 if (!llcrypt_has_encryption_key(inode)) {
2054                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2055                         GOTO(out, rc);
2056                 }
2057                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2058                         struct brw_page *pg = aa->aa_ppga[idx];
2059                         __u64 *p, *q;
2060
2061                         /* do not decrypt if page is all 0s */
2062                         p = q = page_address(pg->pg);
2063                         while (p - q < PAGE_SIZE / sizeof(*p)) {
2064                                 if (*p != 0)
2065                                         break;
2066                                 p++;
2067                         }
2068                         if (p - q == PAGE_SIZE / sizeof(*p)) {
2069                                 /* if page is empty forward info to upper layers
2070                                  * (ll_io_zero_page) by clearing PagePrivate2
2071                                  */
2072                                 ClearPagePrivate2(pg->pg);
2073                                 continue;
2074                         }
2075
2076                         rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2077                                                               PAGE_SIZE, 0);
2078                         if (rc)
2079                                 GOTO(out, rc);
2080                 }
2081         }
2082
2083 out:
2084         if (rc >= 0)
2085                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2086                                      aa->aa_oa, &body->oa);
2087
2088         RETURN(rc);
2089 }
2090
2091 static int osc_brw_redo_request(struct ptlrpc_request *request,
2092                                 struct osc_brw_async_args *aa, int rc)
2093 {
2094         struct ptlrpc_request *new_req;
2095         struct osc_brw_async_args *new_aa;
2096         struct osc_async_page *oap;
2097         ENTRY;
2098
2099         /* The below message is checked in replay-ost-single.sh test_8ae*/
2100         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2101                   "redo for recoverable error %d", rc);
2102
2103         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2104                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2105                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2106                                   aa->aa_ppga, &new_req, 1);
2107         if (rc)
2108                 RETURN(rc);
2109
2110         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2111                 if (oap->oap_request != NULL) {
2112                         LASSERTF(request == oap->oap_request,
2113                                  "request %p != oap_request %p\n",
2114                                  request, oap->oap_request);
2115                 }
2116         }
2117         /*
2118          * New request takes over pga and oaps from old request.
2119          * Note that copying a list_head doesn't work, need to move it...
2120          */
2121         aa->aa_resends++;
2122         new_req->rq_interpret_reply = request->rq_interpret_reply;
2123         new_req->rq_async_args = request->rq_async_args;
2124         new_req->rq_commit_cb = request->rq_commit_cb;
2125         /* cap resend delay to the current request timeout, this is similar to
2126          * what ptlrpc does (see after_reply()) */
2127         if (aa->aa_resends > new_req->rq_timeout)
2128                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2129         else
2130                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2131         new_req->rq_generation_set = 1;
2132         new_req->rq_import_generation = request->rq_import_generation;
2133
2134         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2135
2136         INIT_LIST_HEAD(&new_aa->aa_oaps);
2137         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2138         INIT_LIST_HEAD(&new_aa->aa_exts);
2139         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2140         new_aa->aa_resends = aa->aa_resends;
2141
2142         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2143                 if (oap->oap_request) {
2144                         ptlrpc_req_finished(oap->oap_request);
2145                         oap->oap_request = ptlrpc_request_addref(new_req);
2146                 }
2147         }
2148
2149         /* XXX: This code will run into problem if we're going to support
2150          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2151          * and wait for all of them to be finished. We should inherit request
2152          * set from old request. */
2153         ptlrpcd_add_req(new_req);
2154
2155         DEBUG_REQ(D_INFO, new_req, "new request");
2156         RETURN(0);
2157 }
2158
2159 /*
2160  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2161  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2162  * fine for our small page arrays and doesn't require allocation.  its an
2163  * insertion sort that swaps elements that are strides apart, shrinking the
2164  * stride down until its '1' and the array is sorted.
2165  */
2166 static void sort_brw_pages(struct brw_page **array, int num)
2167 {
2168         int stride, i, j;
2169         struct brw_page *tmp;
2170
2171         if (num == 1)
2172                 return;
2173         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2174                 ;
2175
2176         do {
2177                 stride /= 3;
2178                 for (i = stride ; i < num ; i++) {
2179                         tmp = array[i];
2180                         j = i;
2181                         while (j >= stride && array[j - stride]->off > tmp->off) {
2182                                 array[j] = array[j - stride];
2183                                 j -= stride;
2184                         }
2185                         array[j] = tmp;
2186                 }
2187         } while (stride > 1);
2188 }
2189
2190 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2191 {
2192         LASSERT(ppga != NULL);
2193         OBD_FREE_PTR_ARRAY(ppga, count);
2194 }
2195
2196 static int brw_interpret(const struct lu_env *env,
2197                          struct ptlrpc_request *req, void *args, int rc)
2198 {
2199         struct osc_brw_async_args *aa = args;
2200         struct osc_extent *ext;
2201         struct osc_extent *tmp;
2202         struct client_obd *cli = aa->aa_cli;
2203         unsigned long transferred = 0;
2204
2205         ENTRY;
2206
2207         rc = osc_brw_fini_request(req, rc);
2208         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2209
2210         /* restore clear text pages */
2211         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2212
2213         /*
2214          * When server returns -EINPROGRESS, client should always retry
2215          * regardless of the number of times the bulk was resent already.
2216          */
2217         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2218                 if (req->rq_import_generation !=
2219                     req->rq_import->imp_generation) {
2220                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2221                                ""DOSTID", rc = %d.\n",
2222                                req->rq_import->imp_obd->obd_name,
2223                                POSTID(&aa->aa_oa->o_oi), rc);
2224                 } else if (rc == -EINPROGRESS ||
2225                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2226                         rc = osc_brw_redo_request(req, aa, rc);
2227                 } else {
2228                         CERROR("%s: too many resent retries for object: "
2229                                "%llu:%llu, rc = %d.\n",
2230                                req->rq_import->imp_obd->obd_name,
2231                                POSTID(&aa->aa_oa->o_oi), rc);
2232                 }
2233
2234                 if (rc == 0)
2235                         RETURN(0);
2236                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2237                         rc = -EIO;
2238         }
2239
2240         if (rc == 0) {
2241                 struct obdo *oa = aa->aa_oa;
2242                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2243                 unsigned long valid = 0;
2244                 struct cl_object *obj;
2245                 struct osc_async_page *last;
2246
2247                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2248                 obj = osc2cl(last->oap_obj);
2249
2250                 cl_object_attr_lock(obj);
2251                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2252                         attr->cat_blocks = oa->o_blocks;
2253                         valid |= CAT_BLOCKS;
2254                 }
2255                 if (oa->o_valid & OBD_MD_FLMTIME) {
2256                         attr->cat_mtime = oa->o_mtime;
2257                         valid |= CAT_MTIME;
2258                 }
2259                 if (oa->o_valid & OBD_MD_FLATIME) {
2260                         attr->cat_atime = oa->o_atime;
2261                         valid |= CAT_ATIME;
2262                 }
2263                 if (oa->o_valid & OBD_MD_FLCTIME) {
2264                         attr->cat_ctime = oa->o_ctime;
2265                         valid |= CAT_CTIME;
2266                 }
2267
2268                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2269                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2270                         loff_t last_off = last->oap_count + last->oap_obj_off +
2271                                 last->oap_page_off;
2272
2273                         /* Change file size if this is an out of quota or
2274                          * direct IO write and it extends the file size */
2275                         if (loi->loi_lvb.lvb_size < last_off) {
2276                                 attr->cat_size = last_off;
2277                                 valid |= CAT_SIZE;
2278                         }
2279                         /* Extend KMS if it's not a lockless write */
2280                         if (loi->loi_kms < last_off &&
2281                             oap2osc_page(last)->ops_srvlock == 0) {
2282                                 attr->cat_kms = last_off;
2283                                 valid |= CAT_KMS;
2284                         }
2285                 }
2286
2287                 if (valid != 0)
2288                         cl_object_attr_update(env, obj, attr, valid);
2289                 cl_object_attr_unlock(obj);
2290         }
2291         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2292         aa->aa_oa = NULL;
2293
2294         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2295                 osc_inc_unstable_pages(req);
2296
2297         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2298                 list_del_init(&ext->oe_link);
2299                 osc_extent_finish(env, ext, 1,
2300                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2301         }
2302         LASSERT(list_empty(&aa->aa_exts));
2303         LASSERT(list_empty(&aa->aa_oaps));
2304
2305         transferred = (req->rq_bulk == NULL ? /* short io */
2306                        aa->aa_requested_nob :
2307                        req->rq_bulk->bd_nob_transferred);
2308
2309         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2310         ptlrpc_lprocfs_brw(req, transferred);
2311
2312         spin_lock(&cli->cl_loi_list_lock);
2313         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2314          * is called so we know whether to go to sync BRWs or wait for more
2315          * RPCs to complete */
2316         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2317                 cli->cl_w_in_flight--;
2318         else
2319                 cli->cl_r_in_flight--;
2320         osc_wake_cache_waiters(cli);
2321         spin_unlock(&cli->cl_loi_list_lock);
2322
2323         osc_io_unplug(env, cli, NULL);
2324         RETURN(rc);
2325 }
2326
2327 static void brw_commit(struct ptlrpc_request *req)
2328 {
2329         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2330          * this called via the rq_commit_cb, I need to ensure
2331          * osc_dec_unstable_pages is still called. Otherwise unstable
2332          * pages may be leaked. */
2333         spin_lock(&req->rq_lock);
2334         if (likely(req->rq_unstable)) {
2335                 req->rq_unstable = 0;
2336                 spin_unlock(&req->rq_lock);
2337
2338                 osc_dec_unstable_pages(req);
2339         } else {
2340                 req->rq_committed = 1;
2341                 spin_unlock(&req->rq_lock);
2342         }
2343 }
2344
2345 /**
2346  * Build an RPC by the list of extent @ext_list. The caller must ensure
2347  * that the total pages in this list are NOT over max pages per RPC.
2348  * Extents in the list must be in OES_RPC state.
2349  */
2350 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2351                   struct list_head *ext_list, int cmd)
2352 {
2353         struct ptlrpc_request           *req = NULL;
2354         struct osc_extent               *ext;
2355         struct brw_page                 **pga = NULL;
2356         struct osc_brw_async_args       *aa = NULL;
2357         struct obdo                     *oa = NULL;
2358         struct osc_async_page           *oap;
2359         struct osc_object               *obj = NULL;
2360         struct cl_req_attr              *crattr = NULL;
2361         loff_t                          starting_offset = OBD_OBJECT_EOF;
2362         loff_t                          ending_offset = 0;
2363         /* '1' for consistency with code that checks !mpflag to restore */
2364         int mpflag = 1;
2365         int                             mem_tight = 0;
2366         int                             page_count = 0;
2367         bool                            soft_sync = false;
2368         bool                            ndelay = false;
2369         int                             i;
2370         int                             grant = 0;
2371         int                             rc;
2372         __u32                           layout_version = 0;
2373         LIST_HEAD(rpc_list);
2374         struct ost_body                 *body;
2375         ENTRY;
2376         LASSERT(!list_empty(ext_list));
2377
2378         /* add pages into rpc_list to build BRW rpc */
2379         list_for_each_entry(ext, ext_list, oe_link) {
2380                 LASSERT(ext->oe_state == OES_RPC);
2381                 mem_tight |= ext->oe_memalloc;
2382                 grant += ext->oe_grants;
2383                 page_count += ext->oe_nr_pages;
2384                 layout_version = max(layout_version, ext->oe_layout_version);
2385                 if (obj == NULL)
2386                         obj = ext->oe_obj;
2387         }
2388
2389         soft_sync = osc_over_unstable_soft_limit(cli);
2390         if (mem_tight)
2391                 mpflag = memalloc_noreclaim_save();
2392
2393         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2394         if (pga == NULL)
2395                 GOTO(out, rc = -ENOMEM);
2396
2397         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2398         if (oa == NULL)
2399                 GOTO(out, rc = -ENOMEM);
2400
2401         i = 0;
2402         list_for_each_entry(ext, ext_list, oe_link) {
2403                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2404                         if (mem_tight)
2405                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2406                         if (soft_sync)
2407                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2408                         pga[i] = &oap->oap_brw_page;
2409                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2410                         i++;
2411
2412                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2413                         if (starting_offset == OBD_OBJECT_EOF ||
2414                             starting_offset > oap->oap_obj_off)
2415                                 starting_offset = oap->oap_obj_off;
2416                         else
2417                                 LASSERT(oap->oap_page_off == 0);
2418                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2419                                 ending_offset = oap->oap_obj_off +
2420                                                 oap->oap_count;
2421                         else
2422                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2423                                         PAGE_SIZE);
2424                 }
2425                 if (ext->oe_ndelay)
2426                         ndelay = true;
2427         }
2428
2429         /* first page in the list */
2430         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2431
2432         crattr = &osc_env_info(env)->oti_req_attr;
2433         memset(crattr, 0, sizeof(*crattr));
2434         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2435         crattr->cra_flags = ~0ULL;
2436         crattr->cra_page = oap2cl_page(oap);
2437         crattr->cra_oa = oa;
2438         cl_req_attr_set(env, osc2cl(obj), crattr);
2439
2440         if (cmd == OBD_BRW_WRITE) {
2441                 oa->o_grant_used = grant;
2442                 if (layout_version > 0) {
2443                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2444                                PFID(&oa->o_oi.oi_fid), layout_version);
2445
2446                         oa->o_layout_version = layout_version;
2447                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2448                 }
2449         }
2450
2451         sort_brw_pages(pga, page_count);
2452         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2453         if (rc != 0) {
2454                 CERROR("prep_req failed: %d\n", rc);
2455                 GOTO(out, rc);
2456         }
2457
2458         req->rq_commit_cb = brw_commit;
2459         req->rq_interpret_reply = brw_interpret;
2460         req->rq_memalloc = mem_tight != 0;
2461         oap->oap_request = ptlrpc_request_addref(req);
2462         if (ndelay) {
2463                 req->rq_no_resend = req->rq_no_delay = 1;
2464                 /* probably set a shorter timeout value.
2465                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2466                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2467         }
2468
2469         /* Need to update the timestamps after the request is built in case
2470          * we race with setattr (locally or in queue at OST).  If OST gets
2471          * later setattr before earlier BRW (as determined by the request xid),
2472          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2473          * way to do this in a single call.  bug 10150 */
2474         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2475         crattr->cra_oa = &body->oa;
2476         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2477         cl_req_attr_set(env, osc2cl(obj), crattr);
2478         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2479
2480         aa = ptlrpc_req_async_args(aa, req);
2481         INIT_LIST_HEAD(&aa->aa_oaps);
2482         list_splice_init(&rpc_list, &aa->aa_oaps);
2483         INIT_LIST_HEAD(&aa->aa_exts);
2484         list_splice_init(ext_list, &aa->aa_exts);
2485
2486         spin_lock(&cli->cl_loi_list_lock);
2487         starting_offset >>= PAGE_SHIFT;
2488         if (cmd == OBD_BRW_READ) {
2489                 cli->cl_r_in_flight++;
2490                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2491                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2492                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2493                                       starting_offset + 1);
2494         } else {
2495                 cli->cl_w_in_flight++;
2496                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2497                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2498                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2499                                       starting_offset + 1);
2500         }
2501         spin_unlock(&cli->cl_loi_list_lock);
2502
2503         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2504                   page_count, aa, cli->cl_r_in_flight,
2505                   cli->cl_w_in_flight);
2506         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2507
2508         ptlrpcd_add_req(req);
2509         rc = 0;
2510         EXIT;
2511
2512 out:
2513         if (mem_tight)
2514                 memalloc_noreclaim_restore(mpflag);
2515
2516         if (rc != 0) {
2517                 LASSERT(req == NULL);
2518
2519                 if (oa)
2520                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2521                 if (pga) {
2522                         osc_release_bounce_pages(pga, page_count);
2523                         osc_release_ppga(pga, page_count);
2524                 }
2525                 /* this should happen rarely and is pretty bad, it makes the
2526                  * pending list not follow the dirty order */
2527                 while (!list_empty(ext_list)) {
2528                         ext = list_entry(ext_list->next, struct osc_extent,
2529                                          oe_link);
2530                         list_del_init(&ext->oe_link);
2531                         osc_extent_finish(env, ext, 0, rc);
2532                 }
2533         }
2534         RETURN(rc);
2535 }
2536
2537 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2538 {
2539         int set = 0;
2540
2541         LASSERT(lock != NULL);
2542
2543         lock_res_and_lock(lock);
2544
2545         if (lock->l_ast_data == NULL)
2546                 lock->l_ast_data = data;
2547         if (lock->l_ast_data == data)
2548                 set = 1;
2549
2550         unlock_res_and_lock(lock);
2551
2552         return set;
2553 }
2554
2555 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2556                      void *cookie, struct lustre_handle *lockh,
2557                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2558                      int errcode)
2559 {
2560         bool intent = *flags & LDLM_FL_HAS_INTENT;
2561         int rc;
2562         ENTRY;
2563
2564         /* The request was created before ldlm_cli_enqueue call. */
2565         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2566                 struct ldlm_reply *rep;
2567
2568                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2569                 LASSERT(rep != NULL);
2570
2571                 rep->lock_policy_res1 =
2572                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2573                 if (rep->lock_policy_res1)
2574                         errcode = rep->lock_policy_res1;
2575                 if (!speculative)
2576                         *flags |= LDLM_FL_LVB_READY;
2577         } else if (errcode == ELDLM_OK) {
2578                 *flags |= LDLM_FL_LVB_READY;
2579         }
2580
2581         /* Call the update callback. */
2582         rc = (*upcall)(cookie, lockh, errcode);
2583
2584         /* release the reference taken in ldlm_cli_enqueue() */
2585         if (errcode == ELDLM_LOCK_MATCHED)
2586                 errcode = ELDLM_OK;
2587         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2588                 ldlm_lock_decref(lockh, mode);
2589
2590         RETURN(rc);
2591 }
2592
2593 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2594                           void *args, int rc)
2595 {
2596         struct osc_enqueue_args *aa = args;
2597         struct ldlm_lock *lock;
2598         struct lustre_handle *lockh = &aa->oa_lockh;
2599         enum ldlm_mode mode = aa->oa_mode;
2600         struct ost_lvb *lvb = aa->oa_lvb;
2601         __u32 lvb_len = sizeof(*lvb);
2602         __u64 flags = 0;
2603
2604         ENTRY;
2605
2606         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2607          * be valid. */
2608         lock = ldlm_handle2lock(lockh);
2609         LASSERTF(lock != NULL,
2610                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2611                  lockh->cookie, req, aa);
2612
2613         /* Take an additional reference so that a blocking AST that
2614          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2615          * to arrive after an upcall has been executed by
2616          * osc_enqueue_fini(). */
2617         ldlm_lock_addref(lockh, mode);
2618
2619         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2620         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2621
2622         /* Let CP AST to grant the lock first. */
2623         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2624
2625         if (aa->oa_speculative) {
2626                 LASSERT(aa->oa_lvb == NULL);
2627                 LASSERT(aa->oa_flags == NULL);
2628                 aa->oa_flags = &flags;
2629         }
2630
2631         /* Complete obtaining the lock procedure. */
2632         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2633                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2634                                    lockh, rc);
2635         /* Complete osc stuff. */
2636         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2637                               aa->oa_flags, aa->oa_speculative, rc);
2638
2639         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2640
2641         ldlm_lock_decref(lockh, mode);
2642         LDLM_LOCK_PUT(lock);
2643         RETURN(rc);
2644 }
2645
2646 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2647  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2648  * other synchronous requests, however keeping some locks and trying to obtain
2649  * others may take a considerable amount of time in a case of ost failure; and
2650  * when other sync requests do not get released lock from a client, the client
2651  * is evicted from the cluster -- such scenarious make the life difficult, so
2652  * release locks just after they are obtained. */
2653 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2654                      __u64 *flags, union ldlm_policy_data *policy,
2655                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2656                      void *cookie, struct ldlm_enqueue_info *einfo,
2657                      struct ptlrpc_request_set *rqset, int async,
2658                      bool speculative)
2659 {
2660         struct obd_device *obd = exp->exp_obd;
2661         struct lustre_handle lockh = { 0 };
2662         struct ptlrpc_request *req = NULL;
2663         int intent = *flags & LDLM_FL_HAS_INTENT;
2664         __u64 match_flags = *flags;
2665         enum ldlm_mode mode;
2666         int rc;
2667         ENTRY;
2668
2669         /* Filesystem lock extents are extended to page boundaries so that
2670          * dealing with the page cache is a little smoother.  */
2671         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2672         policy->l_extent.end |= ~PAGE_MASK;
2673
2674         /* Next, search for already existing extent locks that will cover us */
2675         /* If we're trying to read, we also search for an existing PW lock.  The
2676          * VFS and page cache already protect us locally, so lots of readers/
2677          * writers can share a single PW lock.
2678          *
2679          * There are problems with conversion deadlocks, so instead of
2680          * converting a read lock to a write lock, we'll just enqueue a new
2681          * one.
2682          *
2683          * At some point we should cancel the read lock instead of making them
2684          * send us a blocking callback, but there are problems with canceling
2685          * locks out from other users right now, too. */
2686         mode = einfo->ei_mode;
2687         if (einfo->ei_mode == LCK_PR)
2688                 mode |= LCK_PW;
2689         /* Normal lock requests must wait for the LVB to be ready before
2690          * matching a lock; speculative lock requests do not need to,
2691          * because they will not actually use the lock. */
2692         if (!speculative)
2693                 match_flags |= LDLM_FL_LVB_READY;
2694         if (intent != 0)
2695                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2696         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2697                                einfo->ei_type, policy, mode, &lockh, 0);
2698         if (mode) {
2699                 struct ldlm_lock *matched;
2700
2701                 if (*flags & LDLM_FL_TEST_LOCK)
2702                         RETURN(ELDLM_OK);
2703
2704                 matched = ldlm_handle2lock(&lockh);
2705                 if (speculative) {
2706                         /* This DLM lock request is speculative, and does not
2707                          * have an associated IO request. Therefore if there
2708                          * is already a DLM lock, it wll just inform the
2709                          * caller to cancel the request for this stripe.*/
2710                         lock_res_and_lock(matched);
2711                         if (ldlm_extent_equal(&policy->l_extent,
2712                             &matched->l_policy_data.l_extent))
2713                                 rc = -EEXIST;
2714                         else
2715                                 rc = -ECANCELED;
2716                         unlock_res_and_lock(matched);
2717
2718                         ldlm_lock_decref(&lockh, mode);
2719                         LDLM_LOCK_PUT(matched);
2720                         RETURN(rc);
2721                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2722                         *flags |= LDLM_FL_LVB_READY;
2723
2724                         /* We already have a lock, and it's referenced. */
2725                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2726
2727                         ldlm_lock_decref(&lockh, mode);
2728                         LDLM_LOCK_PUT(matched);
2729                         RETURN(ELDLM_OK);
2730                 } else {
2731                         ldlm_lock_decref(&lockh, mode);
2732                         LDLM_LOCK_PUT(matched);
2733                 }
2734         }
2735
2736         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2737                 RETURN(-ENOLCK);
2738
2739         if (intent) {
2740                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2741                                            &RQF_LDLM_ENQUEUE_LVB);
2742                 if (req == NULL)
2743                         RETURN(-ENOMEM);
2744
2745                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2746                 if (rc) {
2747                         ptlrpc_request_free(req);
2748                         RETURN(rc);
2749                 }
2750
2751                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2752                                      sizeof *lvb);
2753                 ptlrpc_request_set_replen(req);
2754         }
2755
2756         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2757         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2758
2759         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2760                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2761         if (async) {
2762                 if (!rc) {
2763                         struct osc_enqueue_args *aa;
2764                         aa = ptlrpc_req_async_args(aa, req);
2765                         aa->oa_exp         = exp;
2766                         aa->oa_mode        = einfo->ei_mode;
2767                         aa->oa_type        = einfo->ei_type;
2768                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2769                         aa->oa_upcall      = upcall;
2770                         aa->oa_cookie      = cookie;
2771                         aa->oa_speculative = speculative;
2772                         if (!speculative) {
2773                                 aa->oa_flags  = flags;
2774                                 aa->oa_lvb    = lvb;
2775                         } else {
2776                                 /* speculative locks are essentially to enqueue
2777                                  * a DLM lock  in advance, so we don't care
2778                                  * about the result of the enqueue. */
2779                                 aa->oa_lvb    = NULL;
2780                                 aa->oa_flags  = NULL;
2781                         }
2782
2783                         req->rq_interpret_reply = osc_enqueue_interpret;
2784                         ptlrpc_set_add_req(rqset, req);
2785                 } else if (intent) {
2786                         ptlrpc_req_finished(req);
2787                 }
2788                 RETURN(rc);
2789         }
2790
2791         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2792                               flags, speculative, rc);
2793         if (intent)
2794                 ptlrpc_req_finished(req);
2795
2796         RETURN(rc);
2797 }
2798
2799 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2800                    struct ldlm_res_id *res_id, enum ldlm_type type,
2801                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2802                    __u64 *flags, struct osc_object *obj,
2803                    struct lustre_handle *lockh, int unref)
2804 {
2805         struct obd_device *obd = exp->exp_obd;
2806         __u64 lflags = *flags;
2807         enum ldlm_mode rc;
2808         ENTRY;
2809
2810         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2811                 RETURN(-EIO);
2812
2813         /* Filesystem lock extents are extended to page boundaries so that
2814          * dealing with the page cache is a little smoother */
2815         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2816         policy->l_extent.end |= ~PAGE_MASK;
2817
2818         /* Next, search for already existing extent locks that will cover us */
2819         /* If we're trying to read, we also search for an existing PW lock.  The
2820          * VFS and page cache already protect us locally, so lots of readers/
2821          * writers can share a single PW lock. */
2822         rc = mode;
2823         if (mode == LCK_PR)
2824                 rc |= LCK_PW;
2825         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2826                              res_id, type, policy, rc, lockh, unref);
2827         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2828                 RETURN(rc);
2829
2830         if (obj != NULL) {
2831                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2832
2833                 LASSERT(lock != NULL);
2834                 if (osc_set_lock_data(lock, obj)) {
2835                         lock_res_and_lock(lock);
2836                         if (!ldlm_is_lvb_cached(lock)) {
2837                                 LASSERT(lock->l_ast_data == obj);
2838                                 osc_lock_lvb_update(env, obj, lock, NULL);
2839                                 ldlm_set_lvb_cached(lock);
2840                         }
2841                         unlock_res_and_lock(lock);
2842                 } else {
2843                         ldlm_lock_decref(lockh, rc);
2844                         rc = 0;
2845                 }
2846                 LDLM_LOCK_PUT(lock);
2847         }
2848         RETURN(rc);
2849 }
2850
2851 static int osc_statfs_interpret(const struct lu_env *env,
2852                                 struct ptlrpc_request *req, void *args, int rc)
2853 {
2854         struct osc_async_args *aa = args;
2855         struct obd_statfs *msfs;
2856
2857         ENTRY;
2858         if (rc == -EBADR)
2859                 /*
2860                  * The request has in fact never been sent due to issues at
2861                  * a higher level (LOV).  Exit immediately since the caller
2862                  * is aware of the problem and takes care of the clean up.
2863                  */
2864                 RETURN(rc);
2865
2866         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2867             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2868                 GOTO(out, rc = 0);
2869
2870         if (rc != 0)
2871                 GOTO(out, rc);
2872
2873         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2874         if (msfs == NULL)
2875                 GOTO(out, rc = -EPROTO);
2876
2877         *aa->aa_oi->oi_osfs = *msfs;
2878 out:
2879         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2880
2881         RETURN(rc);
2882 }
2883
2884 static int osc_statfs_async(struct obd_export *exp,
2885                             struct obd_info *oinfo, time64_t max_age,
2886                             struct ptlrpc_request_set *rqset)
2887 {
2888         struct obd_device     *obd = class_exp2obd(exp);
2889         struct ptlrpc_request *req;
2890         struct osc_async_args *aa;
2891         int rc;
2892         ENTRY;
2893
2894         if (obd->obd_osfs_age >= max_age) {
2895                 CDEBUG(D_SUPER,
2896                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2897                        obd->obd_name, &obd->obd_osfs,
2898                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2899                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2900                 spin_lock(&obd->obd_osfs_lock);
2901                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2902                 spin_unlock(&obd->obd_osfs_lock);
2903                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2904                 if (oinfo->oi_cb_up)
2905                         oinfo->oi_cb_up(oinfo, 0);
2906
2907                 RETURN(0);
2908         }
2909
2910         /* We could possibly pass max_age in the request (as an absolute
2911          * timestamp or a "seconds.usec ago") so the target can avoid doing
2912          * extra calls into the filesystem if that isn't necessary (e.g.
2913          * during mount that would help a bit).  Having relative timestamps
2914          * is not so great if request processing is slow, while absolute
2915          * timestamps are not ideal because they need time synchronization. */
2916         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2917         if (req == NULL)
2918                 RETURN(-ENOMEM);
2919
2920         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2921         if (rc) {
2922                 ptlrpc_request_free(req);
2923                 RETURN(rc);
2924         }
2925         ptlrpc_request_set_replen(req);
2926         req->rq_request_portal = OST_CREATE_PORTAL;
2927         ptlrpc_at_set_req_timeout(req);
2928
2929         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2930                 /* procfs requests not want stat in wait for avoid deadlock */
2931                 req->rq_no_resend = 1;
2932                 req->rq_no_delay = 1;
2933         }
2934
2935         req->rq_interpret_reply = osc_statfs_interpret;
2936         aa = ptlrpc_req_async_args(aa, req);
2937         aa->aa_oi = oinfo;
2938
2939         ptlrpc_set_add_req(rqset, req);
2940         RETURN(0);
2941 }
2942
2943 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2944                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2945 {
2946         struct obd_device     *obd = class_exp2obd(exp);
2947         struct obd_statfs     *msfs;
2948         struct ptlrpc_request *req;
2949         struct obd_import     *imp = NULL;
2950         int rc;
2951         ENTRY;
2952
2953
2954         /*Since the request might also come from lprocfs, so we need
2955          *sync this with client_disconnect_export Bug15684*/
2956         down_read(&obd->u.cli.cl_sem);
2957         if (obd->u.cli.cl_import)
2958                 imp = class_import_get(obd->u.cli.cl_import);
2959         up_read(&obd->u.cli.cl_sem);
2960         if (!imp)
2961                 RETURN(-ENODEV);
2962
2963         /* We could possibly pass max_age in the request (as an absolute
2964          * timestamp or a "seconds.usec ago") so the target can avoid doing
2965          * extra calls into the filesystem if that isn't necessary (e.g.
2966          * during mount that would help a bit).  Having relative timestamps
2967          * is not so great if request processing is slow, while absolute
2968          * timestamps are not ideal because they need time synchronization. */
2969         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2970
2971         class_import_put(imp);
2972
2973         if (req == NULL)
2974                 RETURN(-ENOMEM);
2975
2976         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2977         if (rc) {
2978                 ptlrpc_request_free(req);
2979                 RETURN(rc);
2980         }
2981         ptlrpc_request_set_replen(req);
2982         req->rq_request_portal = OST_CREATE_PORTAL;
2983         ptlrpc_at_set_req_timeout(req);
2984
2985         if (flags & OBD_STATFS_NODELAY) {
2986                 /* procfs requests not want stat in wait for avoid deadlock */
2987                 req->rq_no_resend = 1;
2988                 req->rq_no_delay = 1;
2989         }
2990
2991         rc = ptlrpc_queue_wait(req);
2992         if (rc)
2993                 GOTO(out, rc);
2994
2995         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2996         if (msfs == NULL)
2997                 GOTO(out, rc = -EPROTO);
2998
2999         *osfs = *msfs;
3000
3001         EXIT;
3002 out:
3003         ptlrpc_req_finished(req);
3004         return rc;
3005 }
3006
3007 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3008                          void *karg, void __user *uarg)
3009 {
3010         struct obd_device *obd = exp->exp_obd;
3011         struct obd_ioctl_data *data = karg;
3012         int rc = 0;
3013
3014         ENTRY;
3015         if (!try_module_get(THIS_MODULE)) {
3016                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3017                        module_name(THIS_MODULE));
3018                 return -EINVAL;
3019         }
3020         switch (cmd) {
3021         case OBD_IOC_CLIENT_RECOVER:
3022                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3023                                            data->ioc_inlbuf1, 0);
3024                 if (rc > 0)
3025                         rc = 0;
3026                 break;
3027         case IOC_OSC_SET_ACTIVE:
3028                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3029                                               data->ioc_offset);
3030                 break;
3031         default:
3032                 rc = -ENOTTY;
3033                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3034                        obd->obd_name, cmd, current->comm, rc);
3035                 break;
3036         }
3037
3038         module_put(THIS_MODULE);
3039         return rc;
3040 }
3041
3042 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3043                        u32 keylen, void *key, u32 vallen, void *val,
3044                        struct ptlrpc_request_set *set)
3045 {
3046         struct ptlrpc_request *req;
3047         struct obd_device     *obd = exp->exp_obd;
3048         struct obd_import     *imp = class_exp2cliimp(exp);
3049         char                  *tmp;
3050         int                    rc;
3051         ENTRY;
3052
3053         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3054
3055         if (KEY_IS(KEY_CHECKSUM)) {
3056                 if (vallen != sizeof(int))
3057                         RETURN(-EINVAL);
3058                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3059                 RETURN(0);
3060         }
3061
3062         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3063                 sptlrpc_conf_client_adapt(obd);
3064                 RETURN(0);
3065         }
3066
3067         if (KEY_IS(KEY_FLUSH_CTX)) {
3068                 sptlrpc_import_flush_my_ctx(imp);
3069                 RETURN(0);
3070         }
3071
3072         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3073                 struct client_obd *cli = &obd->u.cli;
3074                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3075                 long target = *(long *)val;
3076
3077                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3078                 *(long *)val -= nr;
3079                 RETURN(0);
3080         }
3081
3082         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3083                 RETURN(-EINVAL);
3084
3085         /* We pass all other commands directly to OST. Since nobody calls osc
3086            methods directly and everybody is supposed to go through LOV, we
3087            assume lov checked invalid values for us.
3088            The only recognised values so far are evict_by_nid and mds_conn.
3089            Even if something bad goes through, we'd get a -EINVAL from OST
3090            anyway. */
3091
3092         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3093                                                 &RQF_OST_SET_GRANT_INFO :
3094                                                 &RQF_OBD_SET_INFO);
3095         if (req == NULL)
3096                 RETURN(-ENOMEM);
3097
3098         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3099                              RCL_CLIENT, keylen);
3100         if (!KEY_IS(KEY_GRANT_SHRINK))
3101                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3102                                      RCL_CLIENT, vallen);
3103         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3104         if (rc) {
3105                 ptlrpc_request_free(req);
3106                 RETURN(rc);
3107         }
3108
3109         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3110         memcpy(tmp, key, keylen);
3111         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3112                                                         &RMF_OST_BODY :
3113                                                         &RMF_SETINFO_VAL);
3114         memcpy(tmp, val, vallen);
3115
3116         if (KEY_IS(KEY_GRANT_SHRINK)) {
3117                 struct osc_grant_args *aa;
3118                 struct obdo *oa;
3119
3120                 aa = ptlrpc_req_async_args(aa, req);
3121                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3122                 if (!oa) {
3123                         ptlrpc_req_finished(req);
3124                         RETURN(-ENOMEM);
3125                 }
3126                 *oa = ((struct ost_body *)val)->oa;
3127                 aa->aa_oa = oa;
3128                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3129         }
3130
3131         ptlrpc_request_set_replen(req);
3132         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3133                 LASSERT(set != NULL);
3134                 ptlrpc_set_add_req(set, req);
3135                 ptlrpc_check_set(NULL, set);
3136         } else {
3137                 ptlrpcd_add_req(req);
3138         }
3139
3140         RETURN(0);
3141 }
3142 EXPORT_SYMBOL(osc_set_info_async);
3143
3144 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3145                   struct obd_device *obd, struct obd_uuid *cluuid,
3146                   struct obd_connect_data *data, void *localdata)
3147 {
3148         struct client_obd *cli = &obd->u.cli;
3149
3150         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3151                 long lost_grant;
3152                 long grant;
3153
3154                 spin_lock(&cli->cl_loi_list_lock);
3155                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3156                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3157                         /* restore ocd_grant_blkbits as client page bits */
3158                         data->ocd_grant_blkbits = PAGE_SHIFT;
3159                         grant += cli->cl_dirty_grant;
3160                 } else {
3161                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3162                 }
3163                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3164                 lost_grant = cli->cl_lost_grant;
3165                 cli->cl_lost_grant = 0;
3166                 spin_unlock(&cli->cl_loi_list_lock);
3167
3168                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3169                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3170                        data->ocd_version, data->ocd_grant, lost_grant);
3171         }
3172
3173         RETURN(0);
3174 }
3175 EXPORT_SYMBOL(osc_reconnect);
3176
3177 int osc_disconnect(struct obd_export *exp)
3178 {
3179         struct obd_device *obd = class_exp2obd(exp);
3180         int rc;
3181
3182         rc = client_disconnect_export(exp);
3183         /**
3184          * Initially we put del_shrink_grant before disconnect_export, but it
3185          * causes the following problem if setup (connect) and cleanup
3186          * (disconnect) are tangled together.
3187          *      connect p1                     disconnect p2
3188          *   ptlrpc_connect_import
3189          *     ...............               class_manual_cleanup
3190          *                                     osc_disconnect
3191          *                                     del_shrink_grant
3192          *   ptlrpc_connect_interrupt
3193          *     osc_init_grant
3194          *   add this client to shrink list
3195          *                                      cleanup_osc
3196          * Bang! grant shrink thread trigger the shrink. BUG18662
3197          */
3198         osc_del_grant_list(&obd->u.cli);
3199         return rc;
3200 }
3201 EXPORT_SYMBOL(osc_disconnect);
3202
3203 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3204                                  struct hlist_node *hnode, void *arg)
3205 {
3206         struct lu_env *env = arg;
3207         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3208         struct ldlm_lock *lock;
3209         struct osc_object *osc = NULL;
3210         ENTRY;
3211
3212         lock_res(res);
3213         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3214                 if (lock->l_ast_data != NULL && osc == NULL) {
3215                         osc = lock->l_ast_data;
3216                         cl_object_get(osc2cl(osc));
3217                 }
3218
3219                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3220                  * by the 2nd round of ldlm_namespace_clean() call in
3221                  * osc_import_event(). */
3222                 ldlm_clear_cleaned(lock);
3223         }
3224         unlock_res(res);
3225
3226         if (osc != NULL) {
3227                 osc_object_invalidate(env, osc);
3228                 cl_object_put(env, osc2cl(osc));
3229         }
3230
3231         RETURN(0);
3232 }
3233 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3234
3235 static int osc_import_event(struct obd_device *obd,
3236                             struct obd_import *imp,
3237                             enum obd_import_event event)
3238 {
3239         struct client_obd *cli;
3240         int rc = 0;
3241
3242         ENTRY;
3243         LASSERT(imp->imp_obd == obd);
3244
3245         switch (event) {
3246         case IMP_EVENT_DISCON: {
3247                 cli = &obd->u.cli;
3248                 spin_lock(&cli->cl_loi_list_lock);
3249                 cli->cl_avail_grant = 0;
3250                 cli->cl_lost_grant = 0;
3251                 spin_unlock(&cli->cl_loi_list_lock);
3252                 break;
3253         }
3254         case IMP_EVENT_INACTIVE: {
3255                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3256                 break;
3257         }
3258         case IMP_EVENT_INVALIDATE: {
3259                 struct ldlm_namespace *ns = obd->obd_namespace;
3260                 struct lu_env         *env;
3261                 __u16                  refcheck;
3262
3263                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3264
3265                 env = cl_env_get(&refcheck);
3266                 if (!IS_ERR(env)) {
3267                         osc_io_unplug(env, &obd->u.cli, NULL);
3268
3269                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3270                                                  osc_ldlm_resource_invalidate,
3271                                                  env, 0);
3272                         cl_env_put(env, &refcheck);
3273
3274                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3275                 } else
3276                         rc = PTR_ERR(env);
3277                 break;
3278         }
3279         case IMP_EVENT_ACTIVE: {
3280                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3281                 break;
3282         }
3283         case IMP_EVENT_OCD: {
3284                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3285
3286                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3287                         osc_init_grant(&obd->u.cli, ocd);
3288
3289                 /* See bug 7198 */
3290                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3291                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3292
3293                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3294                 break;
3295         }
3296         case IMP_EVENT_DEACTIVATE: {
3297                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3298                 break;
3299         }
3300         case IMP_EVENT_ACTIVATE: {
3301                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3302                 break;
3303         }
3304         default:
3305                 CERROR("Unknown import event %d\n", event);
3306                 LBUG();
3307         }
3308         RETURN(rc);
3309 }
3310
3311 /**
3312  * Determine whether the lock can be canceled before replaying the lock
3313  * during recovery, see bug16774 for detailed information.
3314  *
3315  * \retval zero the lock can't be canceled
3316  * \retval other ok to cancel
3317  */
3318 static int osc_cancel_weight(struct ldlm_lock *lock)
3319 {
3320         /*
3321          * Cancel all unused and granted extent lock.
3322          */
3323         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3324             ldlm_is_granted(lock) &&
3325             osc_ldlm_weigh_ast(lock) == 0)
3326                 RETURN(1);
3327
3328         RETURN(0);
3329 }
3330
3331 static int brw_queue_work(const struct lu_env *env, void *data)
3332 {
3333         struct client_obd *cli = data;
3334
3335         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3336
3337         osc_io_unplug(env, cli, NULL);
3338         RETURN(0);
3339 }
3340
3341 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3342 {
3343         struct client_obd *cli = &obd->u.cli;
3344         void *handler;
3345         int rc;
3346
3347         ENTRY;
3348
3349         rc = ptlrpcd_addref();
3350         if (rc)
3351                 RETURN(rc);
3352
3353         rc = client_obd_setup(obd, lcfg);
3354         if (rc)
3355                 GOTO(out_ptlrpcd, rc);
3356
3357
3358         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3359         if (IS_ERR(handler))
3360                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3361         cli->cl_writeback_work = handler;
3362
3363         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3364         if (IS_ERR(handler))
3365                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3366         cli->cl_lru_work = handler;
3367
3368         rc = osc_quota_setup(obd);
3369         if (rc)
3370                 GOTO(out_ptlrpcd_work, rc);
3371
3372         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3373         osc_update_next_shrink(cli);
3374
3375         RETURN(rc);
3376
3377 out_ptlrpcd_work:
3378         if (cli->cl_writeback_work != NULL) {
3379                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3380                 cli->cl_writeback_work = NULL;
3381         }
3382         if (cli->cl_lru_work != NULL) {
3383                 ptlrpcd_destroy_work(cli->cl_lru_work);
3384                 cli->cl_lru_work = NULL;
3385         }
3386         client_obd_cleanup(obd);
3387 out_ptlrpcd:
3388         ptlrpcd_decref();
3389         RETURN(rc);
3390 }
3391 EXPORT_SYMBOL(osc_setup_common);
3392
3393 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3394 {
3395         struct client_obd *cli = &obd->u.cli;
3396         int                adding;
3397         int                added;
3398         int                req_count;
3399         int                rc;
3400
3401         ENTRY;
3402
3403         rc = osc_setup_common(obd, lcfg);
3404         if (rc < 0)
3405                 RETURN(rc);
3406
3407         rc = osc_tunables_init(obd);
3408         if (rc)
3409                 RETURN(rc);
3410
3411         /*
3412          * We try to control the total number of requests with a upper limit
3413          * osc_reqpool_maxreqcount. There might be some race which will cause
3414          * over-limit allocation, but it is fine.
3415          */
3416         req_count = atomic_read(&osc_pool_req_count);
3417         if (req_count < osc_reqpool_maxreqcount) {
3418                 adding = cli->cl_max_rpcs_in_flight + 2;
3419                 if (req_count + adding > osc_reqpool_maxreqcount)
3420                         adding = osc_reqpool_maxreqcount - req_count;
3421
3422                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3423                 atomic_add(added, &osc_pool_req_count);
3424         }
3425
3426         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3427
3428         spin_lock(&osc_shrink_lock);
3429         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3430         spin_unlock(&osc_shrink_lock);
3431         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3432         cli->cl_import->imp_idle_debug = D_HA;
3433
3434         RETURN(0);
3435 }
3436
3437 int osc_precleanup_common(struct obd_device *obd)
3438 {
3439         struct client_obd *cli = &obd->u.cli;
3440         ENTRY;
3441
3442         /* LU-464
3443          * for echo client, export may be on zombie list, wait for
3444          * zombie thread to cull it, because cli.cl_import will be
3445          * cleared in client_disconnect_export():
3446          *   class_export_destroy() -> obd_cleanup() ->
3447          *   echo_device_free() -> echo_client_cleanup() ->
3448          *   obd_disconnect() -> osc_disconnect() ->
3449          *   client_disconnect_export()
3450          */
3451         obd_zombie_barrier();
3452         if (cli->cl_writeback_work) {
3453                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3454                 cli->cl_writeback_work = NULL;
3455         }
3456
3457         if (cli->cl_lru_work) {
3458                 ptlrpcd_destroy_work(cli->cl_lru_work);
3459                 cli->cl_lru_work = NULL;
3460         }
3461
3462         obd_cleanup_client_import(obd);
3463         RETURN(0);
3464 }
3465 EXPORT_SYMBOL(osc_precleanup_common);
3466
3467 static int osc_precleanup(struct obd_device *obd)
3468 {
3469         ENTRY;
3470
3471         osc_precleanup_common(obd);
3472
3473         ptlrpc_lprocfs_unregister_obd(obd);
3474         RETURN(0);
3475 }
3476
3477 int osc_cleanup_common(struct obd_device *obd)
3478 {
3479         struct client_obd *cli = &obd->u.cli;
3480         int rc;
3481
3482         ENTRY;
3483
3484         spin_lock(&osc_shrink_lock);
3485         list_del(&cli->cl_shrink_list);
3486         spin_unlock(&osc_shrink_lock);
3487
3488         /* lru cleanup */
3489         if (cli->cl_cache != NULL) {
3490                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3491                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3492                 list_del_init(&cli->cl_lru_osc);
3493                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3494                 cli->cl_lru_left = NULL;
3495                 cl_cache_decref(cli->cl_cache);
3496                 cli->cl_cache = NULL;
3497         }
3498
3499         /* free memory of osc quota cache */
3500         osc_quota_cleanup(obd);
3501
3502         rc = client_obd_cleanup(obd);
3503
3504         ptlrpcd_decref();
3505         RETURN(rc);
3506 }
3507 EXPORT_SYMBOL(osc_cleanup_common);
3508
3509 static const struct obd_ops osc_obd_ops = {
3510         .o_owner                = THIS_MODULE,
3511         .o_setup                = osc_setup,
3512         .o_precleanup           = osc_precleanup,
3513         .o_cleanup              = osc_cleanup_common,
3514         .o_add_conn             = client_import_add_conn,
3515         .o_del_conn             = client_import_del_conn,
3516         .o_connect              = client_connect_import,
3517         .o_reconnect            = osc_reconnect,
3518         .o_disconnect           = osc_disconnect,
3519         .o_statfs               = osc_statfs,
3520         .o_statfs_async         = osc_statfs_async,
3521         .o_create               = osc_create,
3522         .o_destroy              = osc_destroy,
3523         .o_getattr              = osc_getattr,
3524         .o_setattr              = osc_setattr,
3525         .o_iocontrol            = osc_iocontrol,
3526         .o_set_info_async       = osc_set_info_async,
3527         .o_import_event         = osc_import_event,
3528         .o_quotactl             = osc_quotactl,
3529 };
3530
3531 static struct shrinker *osc_cache_shrinker;
3532 LIST_HEAD(osc_shrink_list);
3533 DEFINE_SPINLOCK(osc_shrink_lock);
3534
3535 #ifndef HAVE_SHRINKER_COUNT
3536 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3537 {
3538         struct shrink_control scv = {
3539                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3540                 .gfp_mask   = shrink_param(sc, gfp_mask)
3541         };
3542         (void)osc_cache_shrink_scan(shrinker, &scv);
3543
3544         return osc_cache_shrink_count(shrinker, &scv);
3545 }
3546 #endif
3547
3548 static int __init osc_init(void)
3549 {
3550         unsigned int reqpool_size;
3551         unsigned int reqsize;
3552         int rc;
3553         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3554                          osc_cache_shrink_count, osc_cache_shrink_scan);
3555         ENTRY;
3556
3557         /* print an address of _any_ initialized kernel symbol from this
3558          * module, to allow debugging with gdb that doesn't support data
3559          * symbols from modules.*/
3560         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3561
3562         rc = lu_kmem_init(osc_caches);
3563         if (rc)
3564                 RETURN(rc);
3565
3566         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3567                                  LUSTRE_OSC_NAME, &osc_device_type);
3568         if (rc)
3569                 GOTO(out_kmem, rc);
3570
3571         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3572
3573         /* This is obviously too much memory, only prevent overflow here */
3574         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3575                 GOTO(out_type, rc = -EINVAL);
3576
3577         reqpool_size = osc_reqpool_mem_max << 20;
3578
3579         reqsize = 1;
3580         while (reqsize < OST_IO_MAXREQSIZE)
3581                 reqsize = reqsize << 1;
3582
3583         /*
3584          * We don't enlarge the request count in OSC pool according to
3585          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3586          * tried after normal allocation failed. So a small OSC pool won't
3587          * cause much performance degression in most of cases.
3588          */
3589         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3590
3591         atomic_set(&osc_pool_req_count, 0);
3592         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3593                                           ptlrpc_add_rqs_to_pool);
3594
3595         if (osc_rq_pool == NULL)
3596                 GOTO(out_type, rc = -ENOMEM);
3597
3598         rc = osc_start_grant_work();
3599         if (rc != 0)
3600                 GOTO(out_req_pool, rc);
3601
3602         RETURN(rc);
3603
3604 out_req_pool:
3605         ptlrpc_free_rq_pool(osc_rq_pool);
3606 out_type:
3607         class_unregister_type(LUSTRE_OSC_NAME);
3608 out_kmem:
3609         lu_kmem_fini(osc_caches);
3610
3611         RETURN(rc);
3612 }
3613
3614 static void __exit osc_exit(void)
3615 {
3616         osc_stop_grant_work();
3617         remove_shrinker(osc_cache_shrinker);
3618         class_unregister_type(LUSTRE_OSC_NAME);
3619         lu_kmem_fini(osc_caches);
3620         ptlrpc_free_rq_pool(osc_rq_pool);
3621 }
3622
3623 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3624 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3625 MODULE_VERSION(LUSTRE_VERSION_STRING);
3626 MODULE_LICENSE("GPL");
3627
3628 module_init(osc_init);
3629 module_exit(osc_exit);