Whamcloud - gitweb
LU-12275 sec: encryption for write path
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 cli->cl_avail_grant -= cli->cl_reserved_grant;
1027                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1028                         cli->cl_avail_grant -= cli->cl_dirty_grant;
1029                 else
1030                         cli->cl_avail_grant -=
1031                                         cli->cl_dirty_pages << PAGE_SHIFT;
1032         }
1033
1034         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1035                 u64 size;
1036                 int chunk_mask;
1037
1038                 /* overhead for each extent insertion */
1039                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1040                 /* determine the appropriate chunk size used by osc_extent. */
1041                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1042                                           ocd->ocd_grant_blkbits);
1043                 /* max_pages_per_rpc must be chunk aligned */
1044                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1045                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1046                                              ~chunk_mask) & chunk_mask;
1047                 /* determine maximum extent size, in #pages */
1048                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1049                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1050                 if (cli->cl_max_extent_pages == 0)
1051                         cli->cl_max_extent_pages = 1;
1052         } else {
1053                 cli->cl_grant_extent_tax = 0;
1054                 cli->cl_chunkbits = PAGE_SHIFT;
1055                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1056         }
1057         spin_unlock(&cli->cl_loi_list_lock);
1058
1059         CDEBUG(D_CACHE,
1060                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1061                cli_name(cli),
1062                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1063                cli->cl_max_extent_pages);
1064
1065         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1066                 osc_add_grant_list(cli);
1067 }
1068 EXPORT_SYMBOL(osc_init_grant);
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, size_t page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            size_t page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0) {
1127                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1128                                i, remote_rcs[i], req);
1129                         return remote_rcs[i];
1130                 }
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138         if (req->rq_bulk != NULL &&
1139             req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1169 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1170                                    size_t pg_count, struct brw_page **pga,
1171                                    int opc, obd_dif_csum_fn *fn,
1172                                    int sector_size,
1173                                    u32 *check_sum)
1174 {
1175         struct ahash_request *req;
1176         /* Used Adler as the default checksum type on top of DIF tags */
1177         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1178         struct page *__page;
1179         unsigned char *buffer;
1180         __u16 *guard_start;
1181         unsigned int bufsize;
1182         int guard_number;
1183         int used_number = 0;
1184         int used;
1185         u32 cksum;
1186         int rc = 0;
1187         int i = 0;
1188
1189         LASSERT(pg_count > 0);
1190
1191         __page = alloc_page(GFP_KERNEL);
1192         if (__page == NULL)
1193                 return -ENOMEM;
1194
1195         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(req)) {
1197                 rc = PTR_ERR(req);
1198                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1199                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1200                 GOTO(out, rc);
1201         }
1202
1203         buffer = kmap(__page);
1204         guard_start = (__u16 *)buffer;
1205         guard_number = PAGE_SIZE / sizeof(*guard_start);
1206         while (nob > 0 && pg_count > 0) {
1207                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (unlikely(i == 0 && opc == OST_READ &&
1212                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1213                         unsigned char *ptr = kmap(pga[i]->pg);
1214                         int off = pga[i]->off & ~PAGE_MASK;
1215
1216                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1217                         kunmap(pga[i]->pg);
1218                 }
1219
1220                 /*
1221                  * The left guard number should be able to hold checksums of a
1222                  * whole page
1223                  */
1224                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1225                                                   pga[i]->off & ~PAGE_MASK,
1226                                                   count,
1227                                                   guard_start + used_number,
1228                                                   guard_number - used_number,
1229                                                   &used, sector_size,
1230                                                   fn);
1231                 if (rc)
1232                         break;
1233
1234                 used_number += used;
1235                 if (used_number == guard_number) {
1236                         cfs_crypto_hash_update_page(req, __page, 0,
1237                                 used_number * sizeof(*guard_start));
1238                         used_number = 0;
1239                 }
1240
1241                 nob -= pga[i]->count;
1242                 pg_count--;
1243                 i++;
1244         }
1245         kunmap(__page);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         if (used_number != 0)
1250                 cfs_crypto_hash_update_page(req, __page, 0,
1251                         used_number * sizeof(*guard_start));
1252
1253         bufsize = sizeof(cksum);
1254         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1255
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         *check_sum = cksum;
1262 out:
1263         __free_page(__page);
1264         return rc;
1265 }
1266 #else /* !CONFIG_CRC_T10DIF */
1267 #define obd_dif_ip_fn NULL
1268 #define obd_dif_crc_fn NULL
1269 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1270         -EOPNOTSUPP
1271 #endif /* CONFIG_CRC_T10DIF */
1272
1273 static int osc_checksum_bulk(int nob, size_t pg_count,
1274                              struct brw_page **pga, int opc,
1275                              enum cksum_types cksum_type,
1276                              u32 *cksum)
1277 {
1278         int                             i = 0;
1279         struct ahash_request           *req;
1280         unsigned int                    bufsize;
1281         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1282
1283         LASSERT(pg_count > 0);
1284
1285         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1286         if (IS_ERR(req)) {
1287                 CERROR("Unable to initialize checksum hash %s\n",
1288                        cfs_crypto_hash_name(cfs_alg));
1289                 return PTR_ERR(req);
1290         }
1291
1292         while (nob > 0 && pg_count > 0) {
1293                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1294
1295                 /* corrupt the data before we compute the checksum, to
1296                  * simulate an OST->client data error */
1297                 if (i == 0 && opc == OST_READ &&
1298                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1299                         unsigned char *ptr = kmap(pga[i]->pg);
1300                         int off = pga[i]->off & ~PAGE_MASK;
1301
1302                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1303                         kunmap(pga[i]->pg);
1304                 }
1305                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1306                                             pga[i]->off & ~PAGE_MASK,
1307                                             count);
1308                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1309                                (int)(pga[i]->off & ~PAGE_MASK));
1310
1311                 nob -= pga[i]->count;
1312                 pg_count--;
1313                 i++;
1314         }
1315
1316         bufsize = sizeof(*cksum);
1317         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1318
1319         /* For sending we only compute the wrong checksum instead
1320          * of corrupting the data so it is still correct on a redo */
1321         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1322                 (*cksum)++;
1323
1324         return 0;
1325 }
1326
1327 static int osc_checksum_bulk_rw(const char *obd_name,
1328                                 enum cksum_types cksum_type,
1329                                 int nob, size_t pg_count,
1330                                 struct brw_page **pga, int opc,
1331                                 u32 *check_sum)
1332 {
1333         obd_dif_csum_fn *fn = NULL;
1334         int sector_size = 0;
1335         int rc;
1336
1337         ENTRY;
1338         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1339
1340         if (fn)
1341                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1342                                              opc, fn, sector_size, check_sum);
1343         else
1344                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1345                                        check_sum);
1346
1347         RETURN(rc);
1348 }
1349
1350 static inline void osc_release_bounce_pages(struct brw_page **pga,
1351                                             u32 page_count)
1352 {
1353 #ifdef HAVE_LUSTRE_CRYPTO
1354         int i;
1355
1356         for (i = 0; i < page_count; i++) {
1357                 if (pga[i]->pg->mapping)
1358                         /* bounce pages are unmapped */
1359                         continue;
1360                 if (pga[i]->flag & OBD_BRW_SYNC)
1361                         /* sync transfer cannot have encrypted pages */
1362                         continue;
1363                 llcrypt_finalize_bounce_page(&pga[i]->pg);
1364                 pga[i]->count -= pga[i]->bp_count_diff;
1365                 pga[i]->off += pga[i]->bp_off_diff;
1366         }
1367 #endif
1368 }
1369
1370 static int
1371 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1372                      u32 page_count, struct brw_page **pga,
1373                      struct ptlrpc_request **reqp, int resend)
1374 {
1375         struct ptlrpc_request *req;
1376         struct ptlrpc_bulk_desc *desc;
1377         struct ost_body *body;
1378         struct obd_ioobj *ioobj;
1379         struct niobuf_remote *niobuf;
1380         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1381         struct osc_brw_async_args *aa;
1382         struct req_capsule *pill;
1383         struct brw_page *pg_prev;
1384         void *short_io_buf;
1385         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1386         struct inode *inode;
1387
1388         ENTRY;
1389         inode = page2inode(pga[0]->pg);
1390         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1391                 RETURN(-ENOMEM); /* Recoverable */
1392         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1393                 RETURN(-EINVAL); /* Fatal */
1394
1395         if ((cmd & OBD_BRW_WRITE) != 0) {
1396                 opc = OST_WRITE;
1397                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1398                                                 osc_rq_pool,
1399                                                 &RQF_OST_BRW_WRITE);
1400         } else {
1401                 opc = OST_READ;
1402                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1403         }
1404         if (req == NULL)
1405                 RETURN(-ENOMEM);
1406
1407         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1408                 for (i = 0; i < page_count; i++) {
1409                         struct brw_page *pg = pga[i];
1410                         struct page *data_page = NULL;
1411                         bool retried = false;
1412                         bool lockedbymyself;
1413
1414 retry_encrypt:
1415                         /* The page can already be locked when we arrive here.
1416                          * This is possible when cl_page_assume/vvp_page_assume
1417                          * is stuck on wait_on_page_writeback with page lock
1418                          * held. In this case there is no risk for the lock to
1419                          * be released while we are doing our encryption
1420                          * processing, because writeback against that page will
1421                          * end in vvp_page_completion_write/cl_page_completion,
1422                          * which means only once the page is fully processed.
1423                          */
1424                         lockedbymyself = trylock_page(pg->pg);
1425                         data_page =
1426                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1427                                                                  PAGE_SIZE, 0,
1428                                                                  GFP_NOFS);
1429                         if (lockedbymyself)
1430                                 unlock_page(pg->pg);
1431                         if (IS_ERR(data_page)) {
1432                                 rc = PTR_ERR(data_page);
1433                                 if (rc == -ENOMEM && !retried) {
1434                                         retried = true;
1435                                         rc = 0;
1436                                         goto retry_encrypt;
1437                                 }
1438                                 ptlrpc_request_free(req);
1439                                 RETURN(rc);
1440                         }
1441                         /* len is forced to PAGE_SIZE, and poff to 0
1442                          * so store the old, clear text info
1443                          */
1444                         pg->pg = data_page;
1445                         pg->bp_count_diff = PAGE_SIZE - pg->count;
1446                         pg->count = PAGE_SIZE;
1447                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1448                         pg->off = pg->off & PAGE_MASK;
1449                 }
1450         }
1451
1452         for (niocount = i = 1; i < page_count; i++) {
1453                 if (!can_merge_pages(pga[i - 1], pga[i]))
1454                         niocount++;
1455         }
1456
1457         pill = &req->rq_pill;
1458         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1459                              sizeof(*ioobj));
1460         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1461                              niocount * sizeof(*niobuf));
1462
1463         for (i = 0; i < page_count; i++)
1464                 short_io_size += pga[i]->count;
1465
1466         /* Check if read/write is small enough to be a short io. */
1467         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1468             !imp_connect_shortio(cli->cl_import))
1469                 short_io_size = 0;
1470
1471         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1472                              opc == OST_READ ? 0 : short_io_size);
1473         if (opc == OST_READ)
1474                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1475                                      short_io_size);
1476
1477         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1478         if (rc) {
1479                 ptlrpc_request_free(req);
1480                 RETURN(rc);
1481         }
1482         osc_set_io_portal(req);
1483
1484         ptlrpc_at_set_req_timeout(req);
1485         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1486          * retry logic */
1487         req->rq_no_retry_einprogress = 1;
1488
1489         if (short_io_size != 0) {
1490                 desc = NULL;
1491                 short_io_buf = NULL;
1492                 goto no_bulk;
1493         }
1494
1495         desc = ptlrpc_prep_bulk_imp(req, page_count,
1496                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1497                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1498                         PTLRPC_BULK_PUT_SINK),
1499                 OST_BULK_PORTAL,
1500                 &ptlrpc_bulk_kiov_pin_ops);
1501
1502         if (desc == NULL)
1503                 GOTO(out, rc = -ENOMEM);
1504         /* NB request now owns desc and will free it when it gets freed */
1505 no_bulk:
1506         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1507         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1508         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1509         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1510
1511         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1512
1513         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1514          * and from_kgid(), because they are asynchronous. Fortunately, variable
1515          * oa contains valid o_uid and o_gid in these two operations.
1516          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1517          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1518          * other process logic */
1519         body->oa.o_uid = oa->o_uid;
1520         body->oa.o_gid = oa->o_gid;
1521
1522         obdo_to_ioobj(oa, ioobj);
1523         ioobj->ioo_bufcnt = niocount;
1524         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1525          * that might be send for this request.  The actual number is decided
1526          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1527          * "max - 1" for old client compatibility sending "0", and also so the
1528          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1529         if (desc != NULL)
1530                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1531         else /* short io */
1532                 ioobj_max_brw_set(ioobj, 0);
1533
1534         if (short_io_size != 0) {
1535                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1536                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1537                         body->oa.o_flags = 0;
1538                 }
1539                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1540                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1541                        short_io_size);
1542                 if (opc == OST_WRITE) {
1543                         short_io_buf = req_capsule_client_get(pill,
1544                                                               &RMF_SHORT_IO);
1545                         LASSERT(short_io_buf != NULL);
1546                 }
1547         }
1548
1549         LASSERT(page_count > 0);
1550         pg_prev = pga[0];
1551         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1552                 struct brw_page *pg = pga[i];
1553                 int poff = pg->off & ~PAGE_MASK;
1554
1555                 LASSERT(pg->count > 0);
1556                 /* make sure there is no gap in the middle of page array */
1557                 LASSERTF(page_count == 1 ||
1558                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1559                           ergo(i > 0 && i < page_count - 1,
1560                                poff == 0 && pg->count == PAGE_SIZE)   &&
1561                           ergo(i == page_count - 1, poff == 0)),
1562                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1563                          i, page_count, pg, pg->off, pg->count);
1564                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1565                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1566                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1567                          i, page_count,
1568                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1569                          pg_prev->pg, page_private(pg_prev->pg),
1570                          pg_prev->pg->index, pg_prev->off);
1571                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1572                         (pg->flag & OBD_BRW_SRVLOCK));
1573                 if (short_io_size != 0 && opc == OST_WRITE) {
1574                         unsigned char *ptr = kmap_atomic(pg->pg);
1575
1576                         LASSERT(short_io_size >= requested_nob + pg->count);
1577                         memcpy(short_io_buf + requested_nob,
1578                                ptr + poff,
1579                                pg->count);
1580                         kunmap_atomic(ptr);
1581                 } else if (short_io_size == 0) {
1582                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1583                                                          pg->count);
1584                 }
1585                 requested_nob += pg->count;
1586
1587                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1588                         niobuf--;
1589                         niobuf->rnb_len += pg->count;
1590                 } else {
1591                         niobuf->rnb_offset = pg->off;
1592                         niobuf->rnb_len    = pg->count;
1593                         niobuf->rnb_flags  = pg->flag;
1594                 }
1595                 pg_prev = pg;
1596         }
1597
1598         LASSERTF((void *)(niobuf - niocount) ==
1599                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1600                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1601                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1602
1603         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1604         if (resend) {
1605                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1606                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1607                         body->oa.o_flags = 0;
1608                 }
1609                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1610         }
1611
1612         if (osc_should_shrink_grant(cli))
1613                 osc_shrink_grant_local(cli, &body->oa);
1614
1615         /* size[REQ_REC_OFF] still sizeof (*body) */
1616         if (opc == OST_WRITE) {
1617                 if (cli->cl_checksum &&
1618                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1619                         /* store cl_cksum_type in a local variable since
1620                          * it can be changed via lprocfs */
1621                         enum cksum_types cksum_type = cli->cl_cksum_type;
1622
1623                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1624                                 body->oa.o_flags = 0;
1625
1626                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1627                                                                 cksum_type);
1628                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1629
1630                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1631                                                   requested_nob, page_count,
1632                                                   pga, OST_WRITE,
1633                                                   &body->oa.o_cksum);
1634                         if (rc < 0) {
1635                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1636                                        rc);
1637                                 GOTO(out, rc);
1638                         }
1639                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1640                                body->oa.o_cksum);
1641
1642                         /* save this in 'oa', too, for later checking */
1643                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1644                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1645                                                            cksum_type);
1646                 } else {
1647                         /* clear out the checksum flag, in case this is a
1648                          * resend but cl_checksum is no longer set. b=11238 */
1649                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1650                 }
1651                 oa->o_cksum = body->oa.o_cksum;
1652                 /* 1 RC per niobuf */
1653                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1654                                      sizeof(__u32) * niocount);
1655         } else {
1656                 if (cli->cl_checksum &&
1657                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1658                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1659                                 body->oa.o_flags = 0;
1660                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1661                                 cli->cl_cksum_type);
1662                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1663                 }
1664
1665                 /* Client cksum has been already copied to wire obdo in previous
1666                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1667                  * resent due to cksum error, this will allow Server to
1668                  * check+dump pages on its side */
1669         }
1670         ptlrpc_request_set_replen(req);
1671
1672         aa = ptlrpc_req_async_args(aa, req);
1673         aa->aa_oa = oa;
1674         aa->aa_requested_nob = requested_nob;
1675         aa->aa_nio_count = niocount;
1676         aa->aa_page_count = page_count;
1677         aa->aa_resends = 0;
1678         aa->aa_ppga = pga;
1679         aa->aa_cli = cli;
1680         INIT_LIST_HEAD(&aa->aa_oaps);
1681
1682         *reqp = req;
1683         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1684         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1685                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1686                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1687         RETURN(0);
1688
1689  out:
1690         ptlrpc_req_finished(req);
1691         RETURN(rc);
1692 }
1693
1694 char dbgcksum_file_name[PATH_MAX];
1695
1696 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1697                                 struct brw_page **pga, __u32 server_cksum,
1698                                 __u32 client_cksum)
1699 {
1700         struct file *filp;
1701         int rc, i;
1702         unsigned int len;
1703         char *buf;
1704
1705         /* will only keep dump of pages on first error for the same range in
1706          * file/fid, not during the resends/retries. */
1707         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1708                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1709                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1710                   libcfs_debug_file_path_arr :
1711                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1712                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1713                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1714                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1715                  pga[0]->off,
1716                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1717                  client_cksum, server_cksum);
1718         filp = filp_open(dbgcksum_file_name,
1719                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1720         if (IS_ERR(filp)) {
1721                 rc = PTR_ERR(filp);
1722                 if (rc == -EEXIST)
1723                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1724                                "checksum error: rc = %d\n", dbgcksum_file_name,
1725                                rc);
1726                 else
1727                         CERROR("%s: can't open to dump pages with checksum "
1728                                "error: rc = %d\n", dbgcksum_file_name, rc);
1729                 return;
1730         }
1731
1732         for (i = 0; i < page_count; i++) {
1733                 len = pga[i]->count;
1734                 buf = kmap(pga[i]->pg);
1735                 while (len != 0) {
1736                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1737                         if (rc < 0) {
1738                                 CERROR("%s: wanted to write %u but got %d "
1739                                        "error\n", dbgcksum_file_name, len, rc);
1740                                 break;
1741                         }
1742                         len -= rc;
1743                         buf += rc;
1744                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1745                                dbgcksum_file_name, rc);
1746                 }
1747                 kunmap(pga[i]->pg);
1748         }
1749
1750         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1751         if (rc)
1752                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1753         filp_close(filp, NULL);
1754 }
1755
1756 static int
1757 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1758                      __u32 client_cksum, __u32 server_cksum,
1759                      struct osc_brw_async_args *aa)
1760 {
1761         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1762         enum cksum_types cksum_type;
1763         obd_dif_csum_fn *fn = NULL;
1764         int sector_size = 0;
1765         __u32 new_cksum;
1766         char *msg;
1767         int rc;
1768
1769         if (server_cksum == client_cksum) {
1770                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1771                 return 0;
1772         }
1773
1774         if (aa->aa_cli->cl_checksum_dump)
1775                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1776                                     server_cksum, client_cksum);
1777
1778         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1779                                            oa->o_flags : 0);
1780
1781         switch (cksum_type) {
1782         case OBD_CKSUM_T10IP512:
1783                 fn = obd_dif_ip_fn;
1784                 sector_size = 512;
1785                 break;
1786         case OBD_CKSUM_T10IP4K:
1787                 fn = obd_dif_ip_fn;
1788                 sector_size = 4096;
1789                 break;
1790         case OBD_CKSUM_T10CRC512:
1791                 fn = obd_dif_crc_fn;
1792                 sector_size = 512;
1793                 break;
1794         case OBD_CKSUM_T10CRC4K:
1795                 fn = obd_dif_crc_fn;
1796                 sector_size = 4096;
1797                 break;
1798         default:
1799                 break;
1800         }
1801
1802         if (fn)
1803                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1804                                              aa->aa_page_count, aa->aa_ppga,
1805                                              OST_WRITE, fn, sector_size,
1806                                              &new_cksum);
1807         else
1808                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1809                                        aa->aa_ppga, OST_WRITE, cksum_type,
1810                                        &new_cksum);
1811
1812         if (rc < 0)
1813                 msg = "failed to calculate the client write checksum";
1814         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1815                 msg = "the server did not use the checksum type specified in "
1816                       "the original request - likely a protocol problem";
1817         else if (new_cksum == server_cksum)
1818                 msg = "changed on the client after we checksummed it - "
1819                       "likely false positive due to mmap IO (bug 11742)";
1820         else if (new_cksum == client_cksum)
1821                 msg = "changed in transit before arrival at OST";
1822         else
1823                 msg = "changed in transit AND doesn't match the original - "
1824                       "likely false positive due to mmap IO (bug 11742)";
1825
1826         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1827                            DFID " object "DOSTID" extent [%llu-%llu], original "
1828                            "client csum %x (type %x), server csum %x (type %x),"
1829                            " client csum now %x\n",
1830                            obd_name, msg, libcfs_nid2str(peer->nid),
1831                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1832                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1833                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1834                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1835                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1836                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1837                            client_cksum,
1838                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1839                            server_cksum, cksum_type, new_cksum);
1840         return 1;
1841 }
1842
1843 /* Note rc enters this function as number of bytes transferred */
1844 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1845 {
1846         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1847         struct client_obd *cli = aa->aa_cli;
1848         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1849         const struct lnet_process_id *peer =
1850                 &req->rq_import->imp_connection->c_peer;
1851         struct ost_body *body;
1852         u32 client_cksum = 0;
1853
1854         ENTRY;
1855
1856         if (rc < 0 && rc != -EDQUOT) {
1857                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1858                 RETURN(rc);
1859         }
1860
1861         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1862         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1863         if (body == NULL) {
1864                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1865                 RETURN(-EPROTO);
1866         }
1867
1868         /* set/clear over quota flag for a uid/gid/projid */
1869         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1870             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1871                 unsigned qid[LL_MAXQUOTAS] = {
1872                                          body->oa.o_uid, body->oa.o_gid,
1873                                          body->oa.o_projid };
1874                 CDEBUG(D_QUOTA,
1875                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1876                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1877                        body->oa.o_valid, body->oa.o_flags);
1878                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1879                                        body->oa.o_flags);
1880         }
1881
1882         osc_update_grant(cli, body);
1883
1884         if (rc < 0)
1885                 RETURN(rc);
1886
1887         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1888                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1889
1890         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1891                 if (rc > 0) {
1892                         CERROR("%s: unexpected positive size %d\n",
1893                                obd_name, rc);
1894                         RETURN(-EPROTO);
1895                 }
1896
1897                 if (req->rq_bulk != NULL &&
1898                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1899                         RETURN(-EAGAIN);
1900
1901                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1902                     check_write_checksum(&body->oa, peer, client_cksum,
1903                                          body->oa.o_cksum, aa))
1904                         RETURN(-EAGAIN);
1905
1906                 rc = check_write_rcs(req, aa->aa_requested_nob,
1907                                      aa->aa_nio_count, aa->aa_page_count,
1908                                      aa->aa_ppga);
1909                 GOTO(out, rc);
1910         }
1911
1912         /* The rest of this function executes only for OST_READs */
1913
1914         if (req->rq_bulk == NULL) {
1915                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1916                                           RCL_SERVER);
1917                 LASSERT(rc == req->rq_status);
1918         } else {
1919                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1920                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1921         }
1922         if (rc < 0)
1923                 GOTO(out, rc = -EAGAIN);
1924
1925         if (rc > aa->aa_requested_nob) {
1926                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1927                        rc, aa->aa_requested_nob);
1928                 RETURN(-EPROTO);
1929         }
1930
1931         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1932                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1933                        rc, req->rq_bulk->bd_nob_transferred);
1934                 RETURN(-EPROTO);
1935         }
1936
1937         if (req->rq_bulk == NULL) {
1938                 /* short io */
1939                 int nob, pg_count, i = 0;
1940                 unsigned char *buf;
1941
1942                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1943                 pg_count = aa->aa_page_count;
1944                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1945                                                    rc);
1946                 nob = rc;
1947                 while (nob > 0 && pg_count > 0) {
1948                         unsigned char *ptr;
1949                         int count = aa->aa_ppga[i]->count > nob ?
1950                                     nob : aa->aa_ppga[i]->count;
1951
1952                         CDEBUG(D_CACHE, "page %p count %d\n",
1953                                aa->aa_ppga[i]->pg, count);
1954                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1955                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1956                                count);
1957                         kunmap_atomic((void *) ptr);
1958
1959                         buf += count;
1960                         nob -= count;
1961                         i++;
1962                         pg_count--;
1963                 }
1964         }
1965
1966         if (rc < aa->aa_requested_nob)
1967                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1968
1969         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1970                 static int cksum_counter;
1971                 u32        server_cksum = body->oa.o_cksum;
1972                 char      *via = "";
1973                 char      *router = "";
1974                 enum cksum_types cksum_type;
1975                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1976                         body->oa.o_flags : 0;
1977
1978                 cksum_type = obd_cksum_type_unpack(o_flags);
1979                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1980                                           aa->aa_page_count, aa->aa_ppga,
1981                                           OST_READ, &client_cksum);
1982                 if (rc < 0)
1983                         GOTO(out, rc);
1984
1985                 if (req->rq_bulk != NULL &&
1986                     peer->nid != req->rq_bulk->bd_sender) {
1987                         via = " via ";
1988                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1989                 }
1990
1991                 if (server_cksum != client_cksum) {
1992                         struct ost_body *clbody;
1993                         u32 page_count = aa->aa_page_count;
1994
1995                         clbody = req_capsule_client_get(&req->rq_pill,
1996                                                         &RMF_OST_BODY);
1997                         if (cli->cl_checksum_dump)
1998                                 dump_all_bulk_pages(&clbody->oa, page_count,
1999                                                     aa->aa_ppga, server_cksum,
2000                                                     client_cksum);
2001
2002                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2003                                            "%s%s%s inode "DFID" object "DOSTID
2004                                            " extent [%llu-%llu], client %x, "
2005                                            "server %x, cksum_type %x\n",
2006                                            obd_name,
2007                                            libcfs_nid2str(peer->nid),
2008                                            via, router,
2009                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2010                                                 clbody->oa.o_parent_seq : 0ULL,
2011                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2012                                                 clbody->oa.o_parent_oid : 0,
2013                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2014                                                 clbody->oa.o_parent_ver : 0,
2015                                            POSTID(&body->oa.o_oi),
2016                                            aa->aa_ppga[0]->off,
2017                                            aa->aa_ppga[page_count-1]->off +
2018                                            aa->aa_ppga[page_count-1]->count - 1,
2019                                            client_cksum, server_cksum,
2020                                            cksum_type);
2021                         cksum_counter = 0;
2022                         aa->aa_oa->o_cksum = client_cksum;
2023                         rc = -EAGAIN;
2024                 } else {
2025                         cksum_counter++;
2026                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2027                         rc = 0;
2028                 }
2029         } else if (unlikely(client_cksum)) {
2030                 static int cksum_missed;
2031
2032                 cksum_missed++;
2033                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2034                         CERROR("%s: checksum %u requested from %s but not sent\n",
2035                                obd_name, cksum_missed,
2036                                libcfs_nid2str(peer->nid));
2037         } else {
2038                 rc = 0;
2039         }
2040 out:
2041         if (rc >= 0)
2042                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2043                                      aa->aa_oa, &body->oa);
2044
2045         RETURN(rc);
2046 }
2047
2048 static int osc_brw_redo_request(struct ptlrpc_request *request,
2049                                 struct osc_brw_async_args *aa, int rc)
2050 {
2051         struct ptlrpc_request *new_req;
2052         struct osc_brw_async_args *new_aa;
2053         struct osc_async_page *oap;
2054         ENTRY;
2055
2056         /* The below message is checked in replay-ost-single.sh test_8ae*/
2057         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2058                   "redo for recoverable error %d", rc);
2059
2060         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2061                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2062                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2063                                   aa->aa_ppga, &new_req, 1);
2064         if (rc)
2065                 RETURN(rc);
2066
2067         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2068                 if (oap->oap_request != NULL) {
2069                         LASSERTF(request == oap->oap_request,
2070                                  "request %p != oap_request %p\n",
2071                                  request, oap->oap_request);
2072                 }
2073         }
2074         /*
2075          * New request takes over pga and oaps from old request.
2076          * Note that copying a list_head doesn't work, need to move it...
2077          */
2078         aa->aa_resends++;
2079         new_req->rq_interpret_reply = request->rq_interpret_reply;
2080         new_req->rq_async_args = request->rq_async_args;
2081         new_req->rq_commit_cb = request->rq_commit_cb;
2082         /* cap resend delay to the current request timeout, this is similar to
2083          * what ptlrpc does (see after_reply()) */
2084         if (aa->aa_resends > new_req->rq_timeout)
2085                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2086         else
2087                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2088         new_req->rq_generation_set = 1;
2089         new_req->rq_import_generation = request->rq_import_generation;
2090
2091         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2092
2093         INIT_LIST_HEAD(&new_aa->aa_oaps);
2094         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2095         INIT_LIST_HEAD(&new_aa->aa_exts);
2096         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2097         new_aa->aa_resends = aa->aa_resends;
2098
2099         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2100                 if (oap->oap_request) {
2101                         ptlrpc_req_finished(oap->oap_request);
2102                         oap->oap_request = ptlrpc_request_addref(new_req);
2103                 }
2104         }
2105
2106         /* XXX: This code will run into problem if we're going to support
2107          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2108          * and wait for all of them to be finished. We should inherit request
2109          * set from old request. */
2110         ptlrpcd_add_req(new_req);
2111
2112         DEBUG_REQ(D_INFO, new_req, "new request");
2113         RETURN(0);
2114 }
2115
2116 /*
2117  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2118  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2119  * fine for our small page arrays and doesn't require allocation.  its an
2120  * insertion sort that swaps elements that are strides apart, shrinking the
2121  * stride down until its '1' and the array is sorted.
2122  */
2123 static void sort_brw_pages(struct brw_page **array, int num)
2124 {
2125         int stride, i, j;
2126         struct brw_page *tmp;
2127
2128         if (num == 1)
2129                 return;
2130         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2131                 ;
2132
2133         do {
2134                 stride /= 3;
2135                 for (i = stride ; i < num ; i++) {
2136                         tmp = array[i];
2137                         j = i;
2138                         while (j >= stride && array[j - stride]->off > tmp->off) {
2139                                 array[j] = array[j - stride];
2140                                 j -= stride;
2141                         }
2142                         array[j] = tmp;
2143                 }
2144         } while (stride > 1);
2145 }
2146
2147 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2148 {
2149         LASSERT(ppga != NULL);
2150         OBD_FREE_PTR_ARRAY(ppga, count);
2151 }
2152
2153 static int brw_interpret(const struct lu_env *env,
2154                          struct ptlrpc_request *req, void *args, int rc)
2155 {
2156         struct osc_brw_async_args *aa = args;
2157         struct osc_extent *ext;
2158         struct osc_extent *tmp;
2159         struct client_obd *cli = aa->aa_cli;
2160         unsigned long transferred = 0;
2161
2162         ENTRY;
2163
2164         rc = osc_brw_fini_request(req, rc);
2165         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2166
2167         /* restore clear text pages */
2168         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2169
2170         /*
2171          * When server returns -EINPROGRESS, client should always retry
2172          * regardless of the number of times the bulk was resent already.
2173          */
2174         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2175                 if (req->rq_import_generation !=
2176                     req->rq_import->imp_generation) {
2177                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2178                                ""DOSTID", rc = %d.\n",
2179                                req->rq_import->imp_obd->obd_name,
2180                                POSTID(&aa->aa_oa->o_oi), rc);
2181                 } else if (rc == -EINPROGRESS ||
2182                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2183                         rc = osc_brw_redo_request(req, aa, rc);
2184                 } else {
2185                         CERROR("%s: too many resent retries for object: "
2186                                "%llu:%llu, rc = %d.\n",
2187                                req->rq_import->imp_obd->obd_name,
2188                                POSTID(&aa->aa_oa->o_oi), rc);
2189                 }
2190
2191                 if (rc == 0)
2192                         RETURN(0);
2193                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2194                         rc = -EIO;
2195         }
2196
2197         if (rc == 0) {
2198                 struct obdo *oa = aa->aa_oa;
2199                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2200                 unsigned long valid = 0;
2201                 struct cl_object *obj;
2202                 struct osc_async_page *last;
2203
2204                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2205                 obj = osc2cl(last->oap_obj);
2206
2207                 cl_object_attr_lock(obj);
2208                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2209                         attr->cat_blocks = oa->o_blocks;
2210                         valid |= CAT_BLOCKS;
2211                 }
2212                 if (oa->o_valid & OBD_MD_FLMTIME) {
2213                         attr->cat_mtime = oa->o_mtime;
2214                         valid |= CAT_MTIME;
2215                 }
2216                 if (oa->o_valid & OBD_MD_FLATIME) {
2217                         attr->cat_atime = oa->o_atime;
2218                         valid |= CAT_ATIME;
2219                 }
2220                 if (oa->o_valid & OBD_MD_FLCTIME) {
2221                         attr->cat_ctime = oa->o_ctime;
2222                         valid |= CAT_CTIME;
2223                 }
2224
2225                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2226                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2227                         loff_t last_off = last->oap_count + last->oap_obj_off +
2228                                 last->oap_page_off;
2229
2230                         /* Change file size if this is an out of quota or
2231                          * direct IO write and it extends the file size */
2232                         if (loi->loi_lvb.lvb_size < last_off) {
2233                                 attr->cat_size = last_off;
2234                                 valid |= CAT_SIZE;
2235                         }
2236                         /* Extend KMS if it's not a lockless write */
2237                         if (loi->loi_kms < last_off &&
2238                             oap2osc_page(last)->ops_srvlock == 0) {
2239                                 attr->cat_kms = last_off;
2240                                 valid |= CAT_KMS;
2241                         }
2242                 }
2243
2244                 if (valid != 0)
2245                         cl_object_attr_update(env, obj, attr, valid);
2246                 cl_object_attr_unlock(obj);
2247         }
2248         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2249         aa->aa_oa = NULL;
2250
2251         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2252                 osc_inc_unstable_pages(req);
2253
2254         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2255                 list_del_init(&ext->oe_link);
2256                 osc_extent_finish(env, ext, 1,
2257                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2258         }
2259         LASSERT(list_empty(&aa->aa_exts));
2260         LASSERT(list_empty(&aa->aa_oaps));
2261
2262         transferred = (req->rq_bulk == NULL ? /* short io */
2263                        aa->aa_requested_nob :
2264                        req->rq_bulk->bd_nob_transferred);
2265
2266         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2267         ptlrpc_lprocfs_brw(req, transferred);
2268
2269         spin_lock(&cli->cl_loi_list_lock);
2270         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2271          * is called so we know whether to go to sync BRWs or wait for more
2272          * RPCs to complete */
2273         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2274                 cli->cl_w_in_flight--;
2275         else
2276                 cli->cl_r_in_flight--;
2277         osc_wake_cache_waiters(cli);
2278         spin_unlock(&cli->cl_loi_list_lock);
2279
2280         osc_io_unplug(env, cli, NULL);
2281         RETURN(rc);
2282 }
2283
2284 static void brw_commit(struct ptlrpc_request *req)
2285 {
2286         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2287          * this called via the rq_commit_cb, I need to ensure
2288          * osc_dec_unstable_pages is still called. Otherwise unstable
2289          * pages may be leaked. */
2290         spin_lock(&req->rq_lock);
2291         if (likely(req->rq_unstable)) {
2292                 req->rq_unstable = 0;
2293                 spin_unlock(&req->rq_lock);
2294
2295                 osc_dec_unstable_pages(req);
2296         } else {
2297                 req->rq_committed = 1;
2298                 spin_unlock(&req->rq_lock);
2299         }
2300 }
2301
2302 /**
2303  * Build an RPC by the list of extent @ext_list. The caller must ensure
2304  * that the total pages in this list are NOT over max pages per RPC.
2305  * Extents in the list must be in OES_RPC state.
2306  */
2307 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2308                   struct list_head *ext_list, int cmd)
2309 {
2310         struct ptlrpc_request           *req = NULL;
2311         struct osc_extent               *ext;
2312         struct brw_page                 **pga = NULL;
2313         struct osc_brw_async_args       *aa = NULL;
2314         struct obdo                     *oa = NULL;
2315         struct osc_async_page           *oap;
2316         struct osc_object               *obj = NULL;
2317         struct cl_req_attr              *crattr = NULL;
2318         loff_t                          starting_offset = OBD_OBJECT_EOF;
2319         loff_t                          ending_offset = 0;
2320         /* '1' for consistency with code that checks !mpflag to restore */
2321         int mpflag = 1;
2322         int                             mem_tight = 0;
2323         int                             page_count = 0;
2324         bool                            soft_sync = false;
2325         bool                            ndelay = false;
2326         int                             i;
2327         int                             grant = 0;
2328         int                             rc;
2329         __u32                           layout_version = 0;
2330         LIST_HEAD(rpc_list);
2331         struct ost_body                 *body;
2332         ENTRY;
2333         LASSERT(!list_empty(ext_list));
2334
2335         /* add pages into rpc_list to build BRW rpc */
2336         list_for_each_entry(ext, ext_list, oe_link) {
2337                 LASSERT(ext->oe_state == OES_RPC);
2338                 mem_tight |= ext->oe_memalloc;
2339                 grant += ext->oe_grants;
2340                 page_count += ext->oe_nr_pages;
2341                 layout_version = max(layout_version, ext->oe_layout_version);
2342                 if (obj == NULL)
2343                         obj = ext->oe_obj;
2344         }
2345
2346         soft_sync = osc_over_unstable_soft_limit(cli);
2347         if (mem_tight)
2348                 mpflag = memalloc_noreclaim_save();
2349
2350         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2351         if (pga == NULL)
2352                 GOTO(out, rc = -ENOMEM);
2353
2354         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2355         if (oa == NULL)
2356                 GOTO(out, rc = -ENOMEM);
2357
2358         i = 0;
2359         list_for_each_entry(ext, ext_list, oe_link) {
2360                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2361                         if (mem_tight)
2362                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2363                         if (soft_sync)
2364                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2365                         pga[i] = &oap->oap_brw_page;
2366                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2367                         i++;
2368
2369                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2370                         if (starting_offset == OBD_OBJECT_EOF ||
2371                             starting_offset > oap->oap_obj_off)
2372                                 starting_offset = oap->oap_obj_off;
2373                         else
2374                                 LASSERT(oap->oap_page_off == 0);
2375                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2376                                 ending_offset = oap->oap_obj_off +
2377                                                 oap->oap_count;
2378                         else
2379                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2380                                         PAGE_SIZE);
2381                 }
2382                 if (ext->oe_ndelay)
2383                         ndelay = true;
2384         }
2385
2386         /* first page in the list */
2387         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2388
2389         crattr = &osc_env_info(env)->oti_req_attr;
2390         memset(crattr, 0, sizeof(*crattr));
2391         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2392         crattr->cra_flags = ~0ULL;
2393         crattr->cra_page = oap2cl_page(oap);
2394         crattr->cra_oa = oa;
2395         cl_req_attr_set(env, osc2cl(obj), crattr);
2396
2397         if (cmd == OBD_BRW_WRITE) {
2398                 oa->o_grant_used = grant;
2399                 if (layout_version > 0) {
2400                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2401                                PFID(&oa->o_oi.oi_fid), layout_version);
2402
2403                         oa->o_layout_version = layout_version;
2404                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2405                 }
2406         }
2407
2408         sort_brw_pages(pga, page_count);
2409         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2410         if (rc != 0) {
2411                 CERROR("prep_req failed: %d\n", rc);
2412                 GOTO(out, rc);
2413         }
2414
2415         req->rq_commit_cb = brw_commit;
2416         req->rq_interpret_reply = brw_interpret;
2417         req->rq_memalloc = mem_tight != 0;
2418         oap->oap_request = ptlrpc_request_addref(req);
2419         if (ndelay) {
2420                 req->rq_no_resend = req->rq_no_delay = 1;
2421                 /* probably set a shorter timeout value.
2422                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2423                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2424         }
2425
2426         /* Need to update the timestamps after the request is built in case
2427          * we race with setattr (locally or in queue at OST).  If OST gets
2428          * later setattr before earlier BRW (as determined by the request xid),
2429          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2430          * way to do this in a single call.  bug 10150 */
2431         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2432         crattr->cra_oa = &body->oa;
2433         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2434         cl_req_attr_set(env, osc2cl(obj), crattr);
2435         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2436
2437         aa = ptlrpc_req_async_args(aa, req);
2438         INIT_LIST_HEAD(&aa->aa_oaps);
2439         list_splice_init(&rpc_list, &aa->aa_oaps);
2440         INIT_LIST_HEAD(&aa->aa_exts);
2441         list_splice_init(ext_list, &aa->aa_exts);
2442
2443         spin_lock(&cli->cl_loi_list_lock);
2444         starting_offset >>= PAGE_SHIFT;
2445         if (cmd == OBD_BRW_READ) {
2446                 cli->cl_r_in_flight++;
2447                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2448                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2449                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2450                                       starting_offset + 1);
2451         } else {
2452                 cli->cl_w_in_flight++;
2453                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2454                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2455                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2456                                       starting_offset + 1);
2457         }
2458         spin_unlock(&cli->cl_loi_list_lock);
2459
2460         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2461                   page_count, aa, cli->cl_r_in_flight,
2462                   cli->cl_w_in_flight);
2463         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2464
2465         ptlrpcd_add_req(req);
2466         rc = 0;
2467         EXIT;
2468
2469 out:
2470         if (mem_tight)
2471                 memalloc_noreclaim_restore(mpflag);
2472
2473         if (rc != 0) {
2474                 LASSERT(req == NULL);
2475
2476                 if (oa)
2477                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2478                 if (pga) {
2479                         osc_release_bounce_pages(pga, page_count);
2480                         osc_release_ppga(pga, page_count);
2481                 }
2482                 /* this should happen rarely and is pretty bad, it makes the
2483                  * pending list not follow the dirty order */
2484                 while (!list_empty(ext_list)) {
2485                         ext = list_entry(ext_list->next, struct osc_extent,
2486                                          oe_link);
2487                         list_del_init(&ext->oe_link);
2488                         osc_extent_finish(env, ext, 0, rc);
2489                 }
2490         }
2491         RETURN(rc);
2492 }
2493
2494 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2495 {
2496         int set = 0;
2497
2498         LASSERT(lock != NULL);
2499
2500         lock_res_and_lock(lock);
2501
2502         if (lock->l_ast_data == NULL)
2503                 lock->l_ast_data = data;
2504         if (lock->l_ast_data == data)
2505                 set = 1;
2506
2507         unlock_res_and_lock(lock);
2508
2509         return set;
2510 }
2511
2512 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2513                      void *cookie, struct lustre_handle *lockh,
2514                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2515                      int errcode)
2516 {
2517         bool intent = *flags & LDLM_FL_HAS_INTENT;
2518         int rc;
2519         ENTRY;
2520
2521         /* The request was created before ldlm_cli_enqueue call. */
2522         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2523                 struct ldlm_reply *rep;
2524
2525                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2526                 LASSERT(rep != NULL);
2527
2528                 rep->lock_policy_res1 =
2529                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2530                 if (rep->lock_policy_res1)
2531                         errcode = rep->lock_policy_res1;
2532                 if (!speculative)
2533                         *flags |= LDLM_FL_LVB_READY;
2534         } else if (errcode == ELDLM_OK) {
2535                 *flags |= LDLM_FL_LVB_READY;
2536         }
2537
2538         /* Call the update callback. */
2539         rc = (*upcall)(cookie, lockh, errcode);
2540
2541         /* release the reference taken in ldlm_cli_enqueue() */
2542         if (errcode == ELDLM_LOCK_MATCHED)
2543                 errcode = ELDLM_OK;
2544         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2545                 ldlm_lock_decref(lockh, mode);
2546
2547         RETURN(rc);
2548 }
2549
2550 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2551                           void *args, int rc)
2552 {
2553         struct osc_enqueue_args *aa = args;
2554         struct ldlm_lock *lock;
2555         struct lustre_handle *lockh = &aa->oa_lockh;
2556         enum ldlm_mode mode = aa->oa_mode;
2557         struct ost_lvb *lvb = aa->oa_lvb;
2558         __u32 lvb_len = sizeof(*lvb);
2559         __u64 flags = 0;
2560
2561         ENTRY;
2562
2563         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2564          * be valid. */
2565         lock = ldlm_handle2lock(lockh);
2566         LASSERTF(lock != NULL,
2567                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2568                  lockh->cookie, req, aa);
2569
2570         /* Take an additional reference so that a blocking AST that
2571          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2572          * to arrive after an upcall has been executed by
2573          * osc_enqueue_fini(). */
2574         ldlm_lock_addref(lockh, mode);
2575
2576         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2577         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2578
2579         /* Let CP AST to grant the lock first. */
2580         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2581
2582         if (aa->oa_speculative) {
2583                 LASSERT(aa->oa_lvb == NULL);
2584                 LASSERT(aa->oa_flags == NULL);
2585                 aa->oa_flags = &flags;
2586         }
2587
2588         /* Complete obtaining the lock procedure. */
2589         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2590                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2591                                    lockh, rc);
2592         /* Complete osc stuff. */
2593         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2594                               aa->oa_flags, aa->oa_speculative, rc);
2595
2596         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2597
2598         ldlm_lock_decref(lockh, mode);
2599         LDLM_LOCK_PUT(lock);
2600         RETURN(rc);
2601 }
2602
2603 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2604  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2605  * other synchronous requests, however keeping some locks and trying to obtain
2606  * others may take a considerable amount of time in a case of ost failure; and
2607  * when other sync requests do not get released lock from a client, the client
2608  * is evicted from the cluster -- such scenarious make the life difficult, so
2609  * release locks just after they are obtained. */
2610 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2611                      __u64 *flags, union ldlm_policy_data *policy,
2612                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2613                      void *cookie, struct ldlm_enqueue_info *einfo,
2614                      struct ptlrpc_request_set *rqset, int async,
2615                      bool speculative)
2616 {
2617         struct obd_device *obd = exp->exp_obd;
2618         struct lustre_handle lockh = { 0 };
2619         struct ptlrpc_request *req = NULL;
2620         int intent = *flags & LDLM_FL_HAS_INTENT;
2621         __u64 match_flags = *flags;
2622         enum ldlm_mode mode;
2623         int rc;
2624         ENTRY;
2625
2626         /* Filesystem lock extents are extended to page boundaries so that
2627          * dealing with the page cache is a little smoother.  */
2628         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2629         policy->l_extent.end |= ~PAGE_MASK;
2630
2631         /* Next, search for already existing extent locks that will cover us */
2632         /* If we're trying to read, we also search for an existing PW lock.  The
2633          * VFS and page cache already protect us locally, so lots of readers/
2634          * writers can share a single PW lock.
2635          *
2636          * There are problems with conversion deadlocks, so instead of
2637          * converting a read lock to a write lock, we'll just enqueue a new
2638          * one.
2639          *
2640          * At some point we should cancel the read lock instead of making them
2641          * send us a blocking callback, but there are problems with canceling
2642          * locks out from other users right now, too. */
2643         mode = einfo->ei_mode;
2644         if (einfo->ei_mode == LCK_PR)
2645                 mode |= LCK_PW;
2646         /* Normal lock requests must wait for the LVB to be ready before
2647          * matching a lock; speculative lock requests do not need to,
2648          * because they will not actually use the lock. */
2649         if (!speculative)
2650                 match_flags |= LDLM_FL_LVB_READY;
2651         if (intent != 0)
2652                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2653         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2654                                einfo->ei_type, policy, mode, &lockh, 0);
2655         if (mode) {
2656                 struct ldlm_lock *matched;
2657
2658                 if (*flags & LDLM_FL_TEST_LOCK)
2659                         RETURN(ELDLM_OK);
2660
2661                 matched = ldlm_handle2lock(&lockh);
2662                 if (speculative) {
2663                         /* This DLM lock request is speculative, and does not
2664                          * have an associated IO request. Therefore if there
2665                          * is already a DLM lock, it wll just inform the
2666                          * caller to cancel the request for this stripe.*/
2667                         lock_res_and_lock(matched);
2668                         if (ldlm_extent_equal(&policy->l_extent,
2669                             &matched->l_policy_data.l_extent))
2670                                 rc = -EEXIST;
2671                         else
2672                                 rc = -ECANCELED;
2673                         unlock_res_and_lock(matched);
2674
2675                         ldlm_lock_decref(&lockh, mode);
2676                         LDLM_LOCK_PUT(matched);
2677                         RETURN(rc);
2678                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2679                         *flags |= LDLM_FL_LVB_READY;
2680
2681                         /* We already have a lock, and it's referenced. */
2682                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2683
2684                         ldlm_lock_decref(&lockh, mode);
2685                         LDLM_LOCK_PUT(matched);
2686                         RETURN(ELDLM_OK);
2687                 } else {
2688                         ldlm_lock_decref(&lockh, mode);
2689                         LDLM_LOCK_PUT(matched);
2690                 }
2691         }
2692
2693         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2694                 RETURN(-ENOLCK);
2695
2696         if (intent) {
2697                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2698                                            &RQF_LDLM_ENQUEUE_LVB);
2699                 if (req == NULL)
2700                         RETURN(-ENOMEM);
2701
2702                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2703                 if (rc) {
2704                         ptlrpc_request_free(req);
2705                         RETURN(rc);
2706                 }
2707
2708                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2709                                      sizeof *lvb);
2710                 ptlrpc_request_set_replen(req);
2711         }
2712
2713         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2714         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2715
2716         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2717                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2718         if (async) {
2719                 if (!rc) {
2720                         struct osc_enqueue_args *aa;
2721                         aa = ptlrpc_req_async_args(aa, req);
2722                         aa->oa_exp         = exp;
2723                         aa->oa_mode        = einfo->ei_mode;
2724                         aa->oa_type        = einfo->ei_type;
2725                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2726                         aa->oa_upcall      = upcall;
2727                         aa->oa_cookie      = cookie;
2728                         aa->oa_speculative = speculative;
2729                         if (!speculative) {
2730                                 aa->oa_flags  = flags;
2731                                 aa->oa_lvb    = lvb;
2732                         } else {
2733                                 /* speculative locks are essentially to enqueue
2734                                  * a DLM lock  in advance, so we don't care
2735                                  * about the result of the enqueue. */
2736                                 aa->oa_lvb    = NULL;
2737                                 aa->oa_flags  = NULL;
2738                         }
2739
2740                         req->rq_interpret_reply = osc_enqueue_interpret;
2741                         ptlrpc_set_add_req(rqset, req);
2742                 } else if (intent) {
2743                         ptlrpc_req_finished(req);
2744                 }
2745                 RETURN(rc);
2746         }
2747
2748         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2749                               flags, speculative, rc);
2750         if (intent)
2751                 ptlrpc_req_finished(req);
2752
2753         RETURN(rc);
2754 }
2755
2756 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2757                    struct ldlm_res_id *res_id, enum ldlm_type type,
2758                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2759                    __u64 *flags, struct osc_object *obj,
2760                    struct lustre_handle *lockh, int unref)
2761 {
2762         struct obd_device *obd = exp->exp_obd;
2763         __u64 lflags = *flags;
2764         enum ldlm_mode rc;
2765         ENTRY;
2766
2767         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2768                 RETURN(-EIO);
2769
2770         /* Filesystem lock extents are extended to page boundaries so that
2771          * dealing with the page cache is a little smoother */
2772         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2773         policy->l_extent.end |= ~PAGE_MASK;
2774
2775         /* Next, search for already existing extent locks that will cover us */
2776         /* If we're trying to read, we also search for an existing PW lock.  The
2777          * VFS and page cache already protect us locally, so lots of readers/
2778          * writers can share a single PW lock. */
2779         rc = mode;
2780         if (mode == LCK_PR)
2781                 rc |= LCK_PW;
2782         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2783                              res_id, type, policy, rc, lockh, unref);
2784         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2785                 RETURN(rc);
2786
2787         if (obj != NULL) {
2788                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2789
2790                 LASSERT(lock != NULL);
2791                 if (osc_set_lock_data(lock, obj)) {
2792                         lock_res_and_lock(lock);
2793                         if (!ldlm_is_lvb_cached(lock)) {
2794                                 LASSERT(lock->l_ast_data == obj);
2795                                 osc_lock_lvb_update(env, obj, lock, NULL);
2796                                 ldlm_set_lvb_cached(lock);
2797                         }
2798                         unlock_res_and_lock(lock);
2799                 } else {
2800                         ldlm_lock_decref(lockh, rc);
2801                         rc = 0;
2802                 }
2803                 LDLM_LOCK_PUT(lock);
2804         }
2805         RETURN(rc);
2806 }
2807
2808 static int osc_statfs_interpret(const struct lu_env *env,
2809                                 struct ptlrpc_request *req, void *args, int rc)
2810 {
2811         struct osc_async_args *aa = args;
2812         struct obd_statfs *msfs;
2813
2814         ENTRY;
2815         if (rc == -EBADR)
2816                 /*
2817                  * The request has in fact never been sent due to issues at
2818                  * a higher level (LOV).  Exit immediately since the caller
2819                  * is aware of the problem and takes care of the clean up.
2820                  */
2821                 RETURN(rc);
2822
2823         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2824             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2825                 GOTO(out, rc = 0);
2826
2827         if (rc != 0)
2828                 GOTO(out, rc);
2829
2830         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2831         if (msfs == NULL)
2832                 GOTO(out, rc = -EPROTO);
2833
2834         *aa->aa_oi->oi_osfs = *msfs;
2835 out:
2836         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2837
2838         RETURN(rc);
2839 }
2840
2841 static int osc_statfs_async(struct obd_export *exp,
2842                             struct obd_info *oinfo, time64_t max_age,
2843                             struct ptlrpc_request_set *rqset)
2844 {
2845         struct obd_device     *obd = class_exp2obd(exp);
2846         struct ptlrpc_request *req;
2847         struct osc_async_args *aa;
2848         int rc;
2849         ENTRY;
2850
2851         if (obd->obd_osfs_age >= max_age) {
2852                 CDEBUG(D_SUPER,
2853                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2854                        obd->obd_name, &obd->obd_osfs,
2855                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2856                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2857                 spin_lock(&obd->obd_osfs_lock);
2858                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2859                 spin_unlock(&obd->obd_osfs_lock);
2860                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2861                 if (oinfo->oi_cb_up)
2862                         oinfo->oi_cb_up(oinfo, 0);
2863
2864                 RETURN(0);
2865         }
2866
2867         /* We could possibly pass max_age in the request (as an absolute
2868          * timestamp or a "seconds.usec ago") so the target can avoid doing
2869          * extra calls into the filesystem if that isn't necessary (e.g.
2870          * during mount that would help a bit).  Having relative timestamps
2871          * is not so great if request processing is slow, while absolute
2872          * timestamps are not ideal because they need time synchronization. */
2873         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2874         if (req == NULL)
2875                 RETURN(-ENOMEM);
2876
2877         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2878         if (rc) {
2879                 ptlrpc_request_free(req);
2880                 RETURN(rc);
2881         }
2882         ptlrpc_request_set_replen(req);
2883         req->rq_request_portal = OST_CREATE_PORTAL;
2884         ptlrpc_at_set_req_timeout(req);
2885
2886         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2887                 /* procfs requests not want stat in wait for avoid deadlock */
2888                 req->rq_no_resend = 1;
2889                 req->rq_no_delay = 1;
2890         }
2891
2892         req->rq_interpret_reply = osc_statfs_interpret;
2893         aa = ptlrpc_req_async_args(aa, req);
2894         aa->aa_oi = oinfo;
2895
2896         ptlrpc_set_add_req(rqset, req);
2897         RETURN(0);
2898 }
2899
2900 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2901                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2902 {
2903         struct obd_device     *obd = class_exp2obd(exp);
2904         struct obd_statfs     *msfs;
2905         struct ptlrpc_request *req;
2906         struct obd_import     *imp = NULL;
2907         int rc;
2908         ENTRY;
2909
2910
2911         /*Since the request might also come from lprocfs, so we need
2912          *sync this with client_disconnect_export Bug15684*/
2913         down_read(&obd->u.cli.cl_sem);
2914         if (obd->u.cli.cl_import)
2915                 imp = class_import_get(obd->u.cli.cl_import);
2916         up_read(&obd->u.cli.cl_sem);
2917         if (!imp)
2918                 RETURN(-ENODEV);
2919
2920         /* We could possibly pass max_age in the request (as an absolute
2921          * timestamp or a "seconds.usec ago") so the target can avoid doing
2922          * extra calls into the filesystem if that isn't necessary (e.g.
2923          * during mount that would help a bit).  Having relative timestamps
2924          * is not so great if request processing is slow, while absolute
2925          * timestamps are not ideal because they need time synchronization. */
2926         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2927
2928         class_import_put(imp);
2929
2930         if (req == NULL)
2931                 RETURN(-ENOMEM);
2932
2933         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2934         if (rc) {
2935                 ptlrpc_request_free(req);
2936                 RETURN(rc);
2937         }
2938         ptlrpc_request_set_replen(req);
2939         req->rq_request_portal = OST_CREATE_PORTAL;
2940         ptlrpc_at_set_req_timeout(req);
2941
2942         if (flags & OBD_STATFS_NODELAY) {
2943                 /* procfs requests not want stat in wait for avoid deadlock */
2944                 req->rq_no_resend = 1;
2945                 req->rq_no_delay = 1;
2946         }
2947
2948         rc = ptlrpc_queue_wait(req);
2949         if (rc)
2950                 GOTO(out, rc);
2951
2952         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2953         if (msfs == NULL)
2954                 GOTO(out, rc = -EPROTO);
2955
2956         *osfs = *msfs;
2957
2958         EXIT;
2959 out:
2960         ptlrpc_req_finished(req);
2961         return rc;
2962 }
2963
2964 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2965                          void *karg, void __user *uarg)
2966 {
2967         struct obd_device *obd = exp->exp_obd;
2968         struct obd_ioctl_data *data = karg;
2969         int rc = 0;
2970
2971         ENTRY;
2972         if (!try_module_get(THIS_MODULE)) {
2973                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2974                        module_name(THIS_MODULE));
2975                 return -EINVAL;
2976         }
2977         switch (cmd) {
2978         case OBD_IOC_CLIENT_RECOVER:
2979                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2980                                            data->ioc_inlbuf1, 0);
2981                 if (rc > 0)
2982                         rc = 0;
2983                 break;
2984         case IOC_OSC_SET_ACTIVE:
2985                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2986                                               data->ioc_offset);
2987                 break;
2988         default:
2989                 rc = -ENOTTY;
2990                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2991                        obd->obd_name, cmd, current->comm, rc);
2992                 break;
2993         }
2994
2995         module_put(THIS_MODULE);
2996         return rc;
2997 }
2998
2999 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3000                        u32 keylen, void *key, u32 vallen, void *val,
3001                        struct ptlrpc_request_set *set)
3002 {
3003         struct ptlrpc_request *req;
3004         struct obd_device     *obd = exp->exp_obd;
3005         struct obd_import     *imp = class_exp2cliimp(exp);
3006         char                  *tmp;
3007         int                    rc;
3008         ENTRY;
3009
3010         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3011
3012         if (KEY_IS(KEY_CHECKSUM)) {
3013                 if (vallen != sizeof(int))
3014                         RETURN(-EINVAL);
3015                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3016                 RETURN(0);
3017         }
3018
3019         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3020                 sptlrpc_conf_client_adapt(obd);
3021                 RETURN(0);
3022         }
3023
3024         if (KEY_IS(KEY_FLUSH_CTX)) {
3025                 sptlrpc_import_flush_my_ctx(imp);
3026                 RETURN(0);
3027         }
3028
3029         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3030                 struct client_obd *cli = &obd->u.cli;
3031                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3032                 long target = *(long *)val;
3033
3034                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3035                 *(long *)val -= nr;
3036                 RETURN(0);
3037         }
3038
3039         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3040                 RETURN(-EINVAL);
3041
3042         /* We pass all other commands directly to OST. Since nobody calls osc
3043            methods directly and everybody is supposed to go through LOV, we
3044            assume lov checked invalid values for us.
3045            The only recognised values so far are evict_by_nid and mds_conn.
3046            Even if something bad goes through, we'd get a -EINVAL from OST
3047            anyway. */
3048
3049         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3050                                                 &RQF_OST_SET_GRANT_INFO :
3051                                                 &RQF_OBD_SET_INFO);
3052         if (req == NULL)
3053                 RETURN(-ENOMEM);
3054
3055         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3056                              RCL_CLIENT, keylen);
3057         if (!KEY_IS(KEY_GRANT_SHRINK))
3058                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3059                                      RCL_CLIENT, vallen);
3060         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3061         if (rc) {
3062                 ptlrpc_request_free(req);
3063                 RETURN(rc);
3064         }
3065
3066         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3067         memcpy(tmp, key, keylen);
3068         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3069                                                         &RMF_OST_BODY :
3070                                                         &RMF_SETINFO_VAL);
3071         memcpy(tmp, val, vallen);
3072
3073         if (KEY_IS(KEY_GRANT_SHRINK)) {
3074                 struct osc_grant_args *aa;
3075                 struct obdo *oa;
3076
3077                 aa = ptlrpc_req_async_args(aa, req);
3078                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3079                 if (!oa) {
3080                         ptlrpc_req_finished(req);
3081                         RETURN(-ENOMEM);
3082                 }
3083                 *oa = ((struct ost_body *)val)->oa;
3084                 aa->aa_oa = oa;
3085                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3086         }
3087
3088         ptlrpc_request_set_replen(req);
3089         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3090                 LASSERT(set != NULL);
3091                 ptlrpc_set_add_req(set, req);
3092                 ptlrpc_check_set(NULL, set);
3093         } else {
3094                 ptlrpcd_add_req(req);
3095         }
3096
3097         RETURN(0);
3098 }
3099 EXPORT_SYMBOL(osc_set_info_async);
3100
3101 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3102                   struct obd_device *obd, struct obd_uuid *cluuid,
3103                   struct obd_connect_data *data, void *localdata)
3104 {
3105         struct client_obd *cli = &obd->u.cli;
3106
3107         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3108                 long lost_grant;
3109                 long grant;
3110
3111                 spin_lock(&cli->cl_loi_list_lock);
3112                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3113                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3114                         /* restore ocd_grant_blkbits as client page bits */
3115                         data->ocd_grant_blkbits = PAGE_SHIFT;
3116                         grant += cli->cl_dirty_grant;
3117                 } else {
3118                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3119                 }
3120                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3121                 lost_grant = cli->cl_lost_grant;
3122                 cli->cl_lost_grant = 0;
3123                 spin_unlock(&cli->cl_loi_list_lock);
3124
3125                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3126                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3127                        data->ocd_version, data->ocd_grant, lost_grant);
3128         }
3129
3130         RETURN(0);
3131 }
3132 EXPORT_SYMBOL(osc_reconnect);
3133
3134 int osc_disconnect(struct obd_export *exp)
3135 {
3136         struct obd_device *obd = class_exp2obd(exp);
3137         int rc;
3138
3139         rc = client_disconnect_export(exp);
3140         /**
3141          * Initially we put del_shrink_grant before disconnect_export, but it
3142          * causes the following problem if setup (connect) and cleanup
3143          * (disconnect) are tangled together.
3144          *      connect p1                     disconnect p2
3145          *   ptlrpc_connect_import
3146          *     ...............               class_manual_cleanup
3147          *                                     osc_disconnect
3148          *                                     del_shrink_grant
3149          *   ptlrpc_connect_interrupt
3150          *     osc_init_grant
3151          *   add this client to shrink list
3152          *                                      cleanup_osc
3153          * Bang! grant shrink thread trigger the shrink. BUG18662
3154          */
3155         osc_del_grant_list(&obd->u.cli);
3156         return rc;
3157 }
3158 EXPORT_SYMBOL(osc_disconnect);
3159
3160 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3161                                  struct hlist_node *hnode, void *arg)
3162 {
3163         struct lu_env *env = arg;
3164         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3165         struct ldlm_lock *lock;
3166         struct osc_object *osc = NULL;
3167         ENTRY;
3168
3169         lock_res(res);
3170         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3171                 if (lock->l_ast_data != NULL && osc == NULL) {
3172                         osc = lock->l_ast_data;
3173                         cl_object_get(osc2cl(osc));
3174                 }
3175
3176                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3177                  * by the 2nd round of ldlm_namespace_clean() call in
3178                  * osc_import_event(). */
3179                 ldlm_clear_cleaned(lock);
3180         }
3181         unlock_res(res);
3182
3183         if (osc != NULL) {
3184                 osc_object_invalidate(env, osc);
3185                 cl_object_put(env, osc2cl(osc));
3186         }
3187
3188         RETURN(0);
3189 }
3190 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3191
3192 static int osc_import_event(struct obd_device *obd,
3193                             struct obd_import *imp,
3194                             enum obd_import_event event)
3195 {
3196         struct client_obd *cli;
3197         int rc = 0;
3198
3199         ENTRY;
3200         LASSERT(imp->imp_obd == obd);
3201
3202         switch (event) {
3203         case IMP_EVENT_DISCON: {
3204                 cli = &obd->u.cli;
3205                 spin_lock(&cli->cl_loi_list_lock);
3206                 cli->cl_avail_grant = 0;
3207                 cli->cl_lost_grant = 0;
3208                 spin_unlock(&cli->cl_loi_list_lock);
3209                 break;
3210         }
3211         case IMP_EVENT_INACTIVE: {
3212                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3213                 break;
3214         }
3215         case IMP_EVENT_INVALIDATE: {
3216                 struct ldlm_namespace *ns = obd->obd_namespace;
3217                 struct lu_env         *env;
3218                 __u16                  refcheck;
3219
3220                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3221
3222                 env = cl_env_get(&refcheck);
3223                 if (!IS_ERR(env)) {
3224                         osc_io_unplug(env, &obd->u.cli, NULL);
3225
3226                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3227                                                  osc_ldlm_resource_invalidate,
3228                                                  env, 0);
3229                         cl_env_put(env, &refcheck);
3230
3231                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3232                 } else
3233                         rc = PTR_ERR(env);
3234                 break;
3235         }
3236         case IMP_EVENT_ACTIVE: {
3237                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3238                 break;
3239         }
3240         case IMP_EVENT_OCD: {
3241                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3242
3243                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3244                         osc_init_grant(&obd->u.cli, ocd);
3245
3246                 /* See bug 7198 */
3247                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3248                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3249
3250                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3251                 break;
3252         }
3253         case IMP_EVENT_DEACTIVATE: {
3254                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3255                 break;
3256         }
3257         case IMP_EVENT_ACTIVATE: {
3258                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3259                 break;
3260         }
3261         default:
3262                 CERROR("Unknown import event %d\n", event);
3263                 LBUG();
3264         }
3265         RETURN(rc);
3266 }
3267
3268 /**
3269  * Determine whether the lock can be canceled before replaying the lock
3270  * during recovery, see bug16774 for detailed information.
3271  *
3272  * \retval zero the lock can't be canceled
3273  * \retval other ok to cancel
3274  */
3275 static int osc_cancel_weight(struct ldlm_lock *lock)
3276 {
3277         /*
3278          * Cancel all unused and granted extent lock.
3279          */
3280         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3281             ldlm_is_granted(lock) &&
3282             osc_ldlm_weigh_ast(lock) == 0)
3283                 RETURN(1);
3284
3285         RETURN(0);
3286 }
3287
3288 static int brw_queue_work(const struct lu_env *env, void *data)
3289 {
3290         struct client_obd *cli = data;
3291
3292         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3293
3294         osc_io_unplug(env, cli, NULL);
3295         RETURN(0);
3296 }
3297
3298 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3299 {
3300         struct client_obd *cli = &obd->u.cli;
3301         void *handler;
3302         int rc;
3303
3304         ENTRY;
3305
3306         rc = ptlrpcd_addref();
3307         if (rc)
3308                 RETURN(rc);
3309
3310         rc = client_obd_setup(obd, lcfg);
3311         if (rc)
3312                 GOTO(out_ptlrpcd, rc);
3313
3314
3315         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3316         if (IS_ERR(handler))
3317                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3318         cli->cl_writeback_work = handler;
3319
3320         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3321         if (IS_ERR(handler))
3322                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3323         cli->cl_lru_work = handler;
3324
3325         rc = osc_quota_setup(obd);
3326         if (rc)
3327                 GOTO(out_ptlrpcd_work, rc);
3328
3329         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3330         osc_update_next_shrink(cli);
3331
3332         RETURN(rc);
3333
3334 out_ptlrpcd_work:
3335         if (cli->cl_writeback_work != NULL) {
3336                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3337                 cli->cl_writeback_work = NULL;
3338         }
3339         if (cli->cl_lru_work != NULL) {
3340                 ptlrpcd_destroy_work(cli->cl_lru_work);
3341                 cli->cl_lru_work = NULL;
3342         }
3343         client_obd_cleanup(obd);
3344 out_ptlrpcd:
3345         ptlrpcd_decref();
3346         RETURN(rc);
3347 }
3348 EXPORT_SYMBOL(osc_setup_common);
3349
3350 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3351 {
3352         struct client_obd *cli = &obd->u.cli;
3353         int                adding;
3354         int                added;
3355         int                req_count;
3356         int                rc;
3357
3358         ENTRY;
3359
3360         rc = osc_setup_common(obd, lcfg);
3361         if (rc < 0)
3362                 RETURN(rc);
3363
3364         rc = osc_tunables_init(obd);
3365         if (rc)
3366                 RETURN(rc);
3367
3368         /*
3369          * We try to control the total number of requests with a upper limit
3370          * osc_reqpool_maxreqcount. There might be some race which will cause
3371          * over-limit allocation, but it is fine.
3372          */
3373         req_count = atomic_read(&osc_pool_req_count);
3374         if (req_count < osc_reqpool_maxreqcount) {
3375                 adding = cli->cl_max_rpcs_in_flight + 2;
3376                 if (req_count + adding > osc_reqpool_maxreqcount)
3377                         adding = osc_reqpool_maxreqcount - req_count;
3378
3379                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3380                 atomic_add(added, &osc_pool_req_count);
3381         }
3382
3383         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3384
3385         spin_lock(&osc_shrink_lock);
3386         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3387         spin_unlock(&osc_shrink_lock);
3388         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3389         cli->cl_import->imp_idle_debug = D_HA;
3390
3391         RETURN(0);
3392 }
3393
3394 int osc_precleanup_common(struct obd_device *obd)
3395 {
3396         struct client_obd *cli = &obd->u.cli;
3397         ENTRY;
3398
3399         /* LU-464
3400          * for echo client, export may be on zombie list, wait for
3401          * zombie thread to cull it, because cli.cl_import will be
3402          * cleared in client_disconnect_export():
3403          *   class_export_destroy() -> obd_cleanup() ->
3404          *   echo_device_free() -> echo_client_cleanup() ->
3405          *   obd_disconnect() -> osc_disconnect() ->
3406          *   client_disconnect_export()
3407          */
3408         obd_zombie_barrier();
3409         if (cli->cl_writeback_work) {
3410                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3411                 cli->cl_writeback_work = NULL;
3412         }
3413
3414         if (cli->cl_lru_work) {
3415                 ptlrpcd_destroy_work(cli->cl_lru_work);
3416                 cli->cl_lru_work = NULL;
3417         }
3418
3419         obd_cleanup_client_import(obd);
3420         RETURN(0);
3421 }
3422 EXPORT_SYMBOL(osc_precleanup_common);
3423
3424 static int osc_precleanup(struct obd_device *obd)
3425 {
3426         ENTRY;
3427
3428         osc_precleanup_common(obd);
3429
3430         ptlrpc_lprocfs_unregister_obd(obd);
3431         RETURN(0);
3432 }
3433
3434 int osc_cleanup_common(struct obd_device *obd)
3435 {
3436         struct client_obd *cli = &obd->u.cli;
3437         int rc;
3438
3439         ENTRY;
3440
3441         spin_lock(&osc_shrink_lock);
3442         list_del(&cli->cl_shrink_list);
3443         spin_unlock(&osc_shrink_lock);
3444
3445         /* lru cleanup */
3446         if (cli->cl_cache != NULL) {
3447                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3448                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3449                 list_del_init(&cli->cl_lru_osc);
3450                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3451                 cli->cl_lru_left = NULL;
3452                 cl_cache_decref(cli->cl_cache);
3453                 cli->cl_cache = NULL;
3454         }
3455
3456         /* free memory of osc quota cache */
3457         osc_quota_cleanup(obd);
3458
3459         rc = client_obd_cleanup(obd);
3460
3461         ptlrpcd_decref();
3462         RETURN(rc);
3463 }
3464 EXPORT_SYMBOL(osc_cleanup_common);
3465
3466 static const struct obd_ops osc_obd_ops = {
3467         .o_owner                = THIS_MODULE,
3468         .o_setup                = osc_setup,
3469         .o_precleanup           = osc_precleanup,
3470         .o_cleanup              = osc_cleanup_common,
3471         .o_add_conn             = client_import_add_conn,
3472         .o_del_conn             = client_import_del_conn,
3473         .o_connect              = client_connect_import,
3474         .o_reconnect            = osc_reconnect,
3475         .o_disconnect           = osc_disconnect,
3476         .o_statfs               = osc_statfs,
3477         .o_statfs_async         = osc_statfs_async,
3478         .o_create               = osc_create,
3479         .o_destroy              = osc_destroy,
3480         .o_getattr              = osc_getattr,
3481         .o_setattr              = osc_setattr,
3482         .o_iocontrol            = osc_iocontrol,
3483         .o_set_info_async       = osc_set_info_async,
3484         .o_import_event         = osc_import_event,
3485         .o_quotactl             = osc_quotactl,
3486 };
3487
3488 static struct shrinker *osc_cache_shrinker;
3489 LIST_HEAD(osc_shrink_list);
3490 DEFINE_SPINLOCK(osc_shrink_lock);
3491
3492 #ifndef HAVE_SHRINKER_COUNT
3493 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3494 {
3495         struct shrink_control scv = {
3496                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3497                 .gfp_mask   = shrink_param(sc, gfp_mask)
3498         };
3499         (void)osc_cache_shrink_scan(shrinker, &scv);
3500
3501         return osc_cache_shrink_count(shrinker, &scv);
3502 }
3503 #endif
3504
3505 static int __init osc_init(void)
3506 {
3507         unsigned int reqpool_size;
3508         unsigned int reqsize;
3509         int rc;
3510         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3511                          osc_cache_shrink_count, osc_cache_shrink_scan);
3512         ENTRY;
3513
3514         /* print an address of _any_ initialized kernel symbol from this
3515          * module, to allow debugging with gdb that doesn't support data
3516          * symbols from modules.*/
3517         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3518
3519         rc = lu_kmem_init(osc_caches);
3520         if (rc)
3521                 RETURN(rc);
3522
3523         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3524                                  LUSTRE_OSC_NAME, &osc_device_type);
3525         if (rc)
3526                 GOTO(out_kmem, rc);
3527
3528         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3529
3530         /* This is obviously too much memory, only prevent overflow here */
3531         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3532                 GOTO(out_type, rc = -EINVAL);
3533
3534         reqpool_size = osc_reqpool_mem_max << 20;
3535
3536         reqsize = 1;
3537         while (reqsize < OST_IO_MAXREQSIZE)
3538                 reqsize = reqsize << 1;
3539
3540         /*
3541          * We don't enlarge the request count in OSC pool according to
3542          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3543          * tried after normal allocation failed. So a small OSC pool won't
3544          * cause much performance degression in most of cases.
3545          */
3546         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3547
3548         atomic_set(&osc_pool_req_count, 0);
3549         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3550                                           ptlrpc_add_rqs_to_pool);
3551
3552         if (osc_rq_pool == NULL)
3553                 GOTO(out_type, rc = -ENOMEM);
3554
3555         rc = osc_start_grant_work();
3556         if (rc != 0)
3557                 GOTO(out_req_pool, rc);
3558
3559         RETURN(rc);
3560
3561 out_req_pool:
3562         ptlrpc_free_rq_pool(osc_rq_pool);
3563 out_type:
3564         class_unregister_type(LUSTRE_OSC_NAME);
3565 out_kmem:
3566         lu_kmem_fini(osc_caches);
3567
3568         RETURN(rc);
3569 }
3570
3571 static void __exit osc_exit(void)
3572 {
3573         osc_stop_grant_work();
3574         remove_shrinker(osc_cache_shrinker);
3575         class_unregister_type(LUSTRE_OSC_NAME);
3576         lu_kmem_fini(osc_caches);
3577         ptlrpc_free_rq_pool(osc_rq_pool);
3578 }
3579
3580 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3581 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3582 MODULE_VERSION(LUSTRE_VERSION_STRING);
3583 MODULE_LICENSE("GPL");
3584
3585 module_init(osc_init);
3586 module_exit(osc_exit);