Whamcloud - gitweb
1acf2d13d4b0a687d554a67ea5093dacfb6d17c9
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 cli->cl_avail_grant -= cli->cl_reserved_grant;
1027                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1028                         cli->cl_avail_grant -= cli->cl_dirty_grant;
1029                 else
1030                         cli->cl_avail_grant -=
1031                                         cli->cl_dirty_pages << PAGE_SHIFT;
1032         }
1033
1034         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1035                 u64 size;
1036                 int chunk_mask;
1037
1038                 /* overhead for each extent insertion */
1039                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1040                 /* determine the appropriate chunk size used by osc_extent. */
1041                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1042                                           ocd->ocd_grant_blkbits);
1043                 /* max_pages_per_rpc must be chunk aligned */
1044                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1045                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1046                                              ~chunk_mask) & chunk_mask;
1047                 /* determine maximum extent size, in #pages */
1048                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1049                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1050                 if (cli->cl_max_extent_pages == 0)
1051                         cli->cl_max_extent_pages = 1;
1052         } else {
1053                 cli->cl_grant_extent_tax = 0;
1054                 cli->cl_chunkbits = PAGE_SHIFT;
1055                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1056         }
1057         spin_unlock(&cli->cl_loi_list_lock);
1058
1059         CDEBUG(D_CACHE,
1060                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1061                cli_name(cli),
1062                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1063                cli->cl_max_extent_pages);
1064
1065         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1066                 osc_add_grant_list(cli);
1067 }
1068 EXPORT_SYMBOL(osc_init_grant);
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, size_t page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            size_t page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0) {
1127                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1128                                i, remote_rcs[i], req);
1129                         return remote_rcs[i];
1130                 }
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138         if (req->rq_bulk != NULL &&
1139             req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1169 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1170                                    size_t pg_count, struct brw_page **pga,
1171                                    int opc, obd_dif_csum_fn *fn,
1172                                    int sector_size,
1173                                    u32 *check_sum)
1174 {
1175         struct ahash_request *req;
1176         /* Used Adler as the default checksum type on top of DIF tags */
1177         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1178         struct page *__page;
1179         unsigned char *buffer;
1180         __u16 *guard_start;
1181         unsigned int bufsize;
1182         int guard_number;
1183         int used_number = 0;
1184         int used;
1185         u32 cksum;
1186         int rc = 0;
1187         int i = 0;
1188
1189         LASSERT(pg_count > 0);
1190
1191         __page = alloc_page(GFP_KERNEL);
1192         if (__page == NULL)
1193                 return -ENOMEM;
1194
1195         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(req)) {
1197                 rc = PTR_ERR(req);
1198                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1199                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1200                 GOTO(out, rc);
1201         }
1202
1203         buffer = kmap(__page);
1204         guard_start = (__u16 *)buffer;
1205         guard_number = PAGE_SIZE / sizeof(*guard_start);
1206         while (nob > 0 && pg_count > 0) {
1207                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (unlikely(i == 0 && opc == OST_READ &&
1212                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1213                         unsigned char *ptr = kmap(pga[i]->pg);
1214                         int off = pga[i]->off & ~PAGE_MASK;
1215
1216                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1217                         kunmap(pga[i]->pg);
1218                 }
1219
1220                 /*
1221                  * The left guard number should be able to hold checksums of a
1222                  * whole page
1223                  */
1224                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1225                                                   pga[i]->off & ~PAGE_MASK,
1226                                                   count,
1227                                                   guard_start + used_number,
1228                                                   guard_number - used_number,
1229                                                   &used, sector_size,
1230                                                   fn);
1231                 if (rc)
1232                         break;
1233
1234                 used_number += used;
1235                 if (used_number == guard_number) {
1236                         cfs_crypto_hash_update_page(req, __page, 0,
1237                                 used_number * sizeof(*guard_start));
1238                         used_number = 0;
1239                 }
1240
1241                 nob -= pga[i]->count;
1242                 pg_count--;
1243                 i++;
1244         }
1245         kunmap(__page);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         if (used_number != 0)
1250                 cfs_crypto_hash_update_page(req, __page, 0,
1251                         used_number * sizeof(*guard_start));
1252
1253         bufsize = sizeof(cksum);
1254         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1255
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         *check_sum = cksum;
1262 out:
1263         __free_page(__page);
1264         return rc;
1265 }
1266 #else /* !CONFIG_CRC_T10DIF */
1267 #define obd_dif_ip_fn NULL
1268 #define obd_dif_crc_fn NULL
1269 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1270         -EOPNOTSUPP
1271 #endif /* CONFIG_CRC_T10DIF */
1272
1273 static int osc_checksum_bulk(int nob, size_t pg_count,
1274                              struct brw_page **pga, int opc,
1275                              enum cksum_types cksum_type,
1276                              u32 *cksum)
1277 {
1278         int                             i = 0;
1279         struct ahash_request           *req;
1280         unsigned int                    bufsize;
1281         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1282
1283         LASSERT(pg_count > 0);
1284
1285         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1286         if (IS_ERR(req)) {
1287                 CERROR("Unable to initialize checksum hash %s\n",
1288                        cfs_crypto_hash_name(cfs_alg));
1289                 return PTR_ERR(req);
1290         }
1291
1292         while (nob > 0 && pg_count > 0) {
1293                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1294
1295                 /* corrupt the data before we compute the checksum, to
1296                  * simulate an OST->client data error */
1297                 if (i == 0 && opc == OST_READ &&
1298                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1299                         unsigned char *ptr = kmap(pga[i]->pg);
1300                         int off = pga[i]->off & ~PAGE_MASK;
1301
1302                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1303                         kunmap(pga[i]->pg);
1304                 }
1305                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1306                                             pga[i]->off & ~PAGE_MASK,
1307                                             count);
1308                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1309                                (int)(pga[i]->off & ~PAGE_MASK));
1310
1311                 nob -= pga[i]->count;
1312                 pg_count--;
1313                 i++;
1314         }
1315
1316         bufsize = sizeof(*cksum);
1317         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1318
1319         /* For sending we only compute the wrong checksum instead
1320          * of corrupting the data so it is still correct on a redo */
1321         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1322                 (*cksum)++;
1323
1324         return 0;
1325 }
1326
1327 static int osc_checksum_bulk_rw(const char *obd_name,
1328                                 enum cksum_types cksum_type,
1329                                 int nob, size_t pg_count,
1330                                 struct brw_page **pga, int opc,
1331                                 u32 *check_sum)
1332 {
1333         obd_dif_csum_fn *fn = NULL;
1334         int sector_size = 0;
1335         int rc;
1336
1337         ENTRY;
1338         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1339
1340         if (fn)
1341                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1342                                              opc, fn, sector_size, check_sum);
1343         else
1344                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1345                                        check_sum);
1346
1347         RETURN(rc);
1348 }
1349
1350 static inline void osc_release_bounce_pages(struct brw_page **pga,
1351                                             u32 page_count)
1352 {
1353 #ifdef HAVE_LUSTRE_CRYPTO
1354         int i;
1355
1356         for (i = 0; i < page_count; i++) {
1357                 if (!pga[i]->pg->mapping)
1358                         /* bounce pages are unmapped */
1359                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1360                 pga[i]->count -= pga[i]->bp_count_diff;
1361                 pga[i]->off += pga[i]->bp_off_diff;
1362         }
1363 #endif
1364 }
1365
1366 static int
1367 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1368                      u32 page_count, struct brw_page **pga,
1369                      struct ptlrpc_request **reqp, int resend)
1370 {
1371         struct ptlrpc_request *req;
1372         struct ptlrpc_bulk_desc *desc;
1373         struct ost_body *body;
1374         struct obd_ioobj *ioobj;
1375         struct niobuf_remote *niobuf;
1376         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1377         struct osc_brw_async_args *aa;
1378         struct req_capsule *pill;
1379         struct brw_page *pg_prev;
1380         void *short_io_buf;
1381         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1382         struct inode *inode;
1383
1384         ENTRY;
1385         inode = page2inode(pga[0]->pg);
1386         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1387                 RETURN(-ENOMEM); /* Recoverable */
1388         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1389                 RETURN(-EINVAL); /* Fatal */
1390
1391         if ((cmd & OBD_BRW_WRITE) != 0) {
1392                 opc = OST_WRITE;
1393                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1394                                                 osc_rq_pool,
1395                                                 &RQF_OST_BRW_WRITE);
1396         } else {
1397                 opc = OST_READ;
1398                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1399         }
1400         if (req == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1404                 for (i = 0; i < page_count; i++) {
1405                         struct brw_page *pg = pga[i];
1406                         struct page *data_page = NULL;
1407                         bool retried = false;
1408                         bool lockedbymyself;
1409                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1410
1411 retry_encrypt:
1412                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1413                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1414                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1415                         /* The page can already be locked when we arrive here.
1416                          * This is possible when cl_page_assume/vvp_page_assume
1417                          * is stuck on wait_on_page_writeback with page lock
1418                          * held. In this case there is no risk for the lock to
1419                          * be released while we are doing our encryption
1420                          * processing, because writeback against that page will
1421                          * end in vvp_page_completion_write/cl_page_completion,
1422                          * which means only once the page is fully processed.
1423                          */
1424                         lockedbymyself = trylock_page(pg->pg);
1425                         data_page =
1426                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1427                                                                  nunits, 0,
1428                                                                  GFP_NOFS);
1429                         if (lockedbymyself)
1430                                 unlock_page(pg->pg);
1431                         if (IS_ERR(data_page)) {
1432                                 rc = PTR_ERR(data_page);
1433                                 if (rc == -ENOMEM && !retried) {
1434                                         retried = true;
1435                                         rc = 0;
1436                                         goto retry_encrypt;
1437                                 }
1438                                 ptlrpc_request_free(req);
1439                                 RETURN(rc);
1440                         }
1441                         pg->pg = data_page;
1442                         /* there should be no gap in the middle of page array */
1443                         if (i == page_count - 1) {
1444                                 struct osc_async_page *oap = brw_page2oap(pg);
1445
1446                                 oa->o_size = oap->oap_count +
1447                                         oap->oap_obj_off + oap->oap_page_off;
1448                         }
1449                         /* len is forced to nunits, and relative offset to 0
1450                          * so store the old, clear text info
1451                          */
1452                         pg->bp_count_diff = nunits - pg->count;
1453                         pg->count = nunits;
1454                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1455                         pg->off = pg->off & PAGE_MASK;
1456                 }
1457         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1458                 for (i = 0; i < page_count; i++) {
1459                         struct brw_page *pg = pga[i];
1460                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1461
1462                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1463                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1464                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1465                         /* count/off are forced to cover the whole encryption
1466                          * unit size so that all encrypted data is stored on the
1467                          * OST, so adjust bp_{count,off}_diff for the size of
1468                          * the clear text.
1469                          */
1470                         pg->bp_count_diff = nunits - pg->count;
1471                         pg->count = nunits;
1472                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1473                         pg->off = pg->off & PAGE_MASK;
1474                 }
1475         }
1476
1477         for (niocount = i = 1; i < page_count; i++) {
1478                 if (!can_merge_pages(pga[i - 1], pga[i]))
1479                         niocount++;
1480         }
1481
1482         pill = &req->rq_pill;
1483         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1484                              sizeof(*ioobj));
1485         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1486                              niocount * sizeof(*niobuf));
1487
1488         for (i = 0; i < page_count; i++) {
1489                 short_io_size += pga[i]->count;
1490                 if (!inode || !IS_ENCRYPTED(inode)) {
1491                         pga[i]->bp_count_diff = 0;
1492                         pga[i]->bp_off_diff = 0;
1493                 }
1494         }
1495
1496         /* Check if read/write is small enough to be a short io. */
1497         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1498             !imp_connect_shortio(cli->cl_import))
1499                 short_io_size = 0;
1500
1501         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1502                              opc == OST_READ ? 0 : short_io_size);
1503         if (opc == OST_READ)
1504                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1505                                      short_io_size);
1506
1507         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1508         if (rc) {
1509                 ptlrpc_request_free(req);
1510                 RETURN(rc);
1511         }
1512         osc_set_io_portal(req);
1513
1514         ptlrpc_at_set_req_timeout(req);
1515         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1516          * retry logic */
1517         req->rq_no_retry_einprogress = 1;
1518
1519         if (short_io_size != 0) {
1520                 desc = NULL;
1521                 short_io_buf = NULL;
1522                 goto no_bulk;
1523         }
1524
1525         desc = ptlrpc_prep_bulk_imp(req, page_count,
1526                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1527                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1528                         PTLRPC_BULK_PUT_SINK),
1529                 OST_BULK_PORTAL,
1530                 &ptlrpc_bulk_kiov_pin_ops);
1531
1532         if (desc == NULL)
1533                 GOTO(out, rc = -ENOMEM);
1534         /* NB request now owns desc and will free it when it gets freed */
1535 no_bulk:
1536         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1537         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1538         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1539         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1540
1541         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1542
1543         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1544          * and from_kgid(), because they are asynchronous. Fortunately, variable
1545          * oa contains valid o_uid and o_gid in these two operations.
1546          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1547          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1548          * other process logic */
1549         body->oa.o_uid = oa->o_uid;
1550         body->oa.o_gid = oa->o_gid;
1551
1552         obdo_to_ioobj(oa, ioobj);
1553         ioobj->ioo_bufcnt = niocount;
1554         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1555          * that might be send for this request.  The actual number is decided
1556          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1557          * "max - 1" for old client compatibility sending "0", and also so the
1558          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1559         if (desc != NULL)
1560                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1561         else /* short io */
1562                 ioobj_max_brw_set(ioobj, 0);
1563
1564         if (short_io_size != 0) {
1565                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1566                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1567                         body->oa.o_flags = 0;
1568                 }
1569                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1570                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1571                        short_io_size);
1572                 if (opc == OST_WRITE) {
1573                         short_io_buf = req_capsule_client_get(pill,
1574                                                               &RMF_SHORT_IO);
1575                         LASSERT(short_io_buf != NULL);
1576                 }
1577         }
1578
1579         LASSERT(page_count > 0);
1580         pg_prev = pga[0];
1581         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1582                 struct brw_page *pg = pga[i];
1583                 int poff = pg->off & ~PAGE_MASK;
1584
1585                 LASSERT(pg->count > 0);
1586                 /* make sure there is no gap in the middle of page array */
1587                 LASSERTF(page_count == 1 ||
1588                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1589                           ergo(i > 0 && i < page_count - 1,
1590                                poff == 0 && pg->count == PAGE_SIZE)   &&
1591                           ergo(i == page_count - 1, poff == 0)),
1592                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1593                          i, page_count, pg, pg->off, pg->count);
1594                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1595                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1596                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1597                          i, page_count,
1598                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1599                          pg_prev->pg, page_private(pg_prev->pg),
1600                          pg_prev->pg->index, pg_prev->off);
1601                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1602                         (pg->flag & OBD_BRW_SRVLOCK));
1603                 if (short_io_size != 0 && opc == OST_WRITE) {
1604                         unsigned char *ptr = kmap_atomic(pg->pg);
1605
1606                         LASSERT(short_io_size >= requested_nob + pg->count);
1607                         memcpy(short_io_buf + requested_nob,
1608                                ptr + poff,
1609                                pg->count);
1610                         kunmap_atomic(ptr);
1611                 } else if (short_io_size == 0) {
1612                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1613                                                          pg->count);
1614                 }
1615                 requested_nob += pg->count;
1616
1617                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1618                         niobuf--;
1619                         niobuf->rnb_len += pg->count;
1620                 } else {
1621                         niobuf->rnb_offset = pg->off;
1622                         niobuf->rnb_len    = pg->count;
1623                         niobuf->rnb_flags  = pg->flag;
1624                 }
1625                 pg_prev = pg;
1626         }
1627
1628         LASSERTF((void *)(niobuf - niocount) ==
1629                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1630                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1631                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1632
1633         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1634         if (resend) {
1635                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1636                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1637                         body->oa.o_flags = 0;
1638                 }
1639                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1640         }
1641
1642         if (osc_should_shrink_grant(cli))
1643                 osc_shrink_grant_local(cli, &body->oa);
1644
1645         /* size[REQ_REC_OFF] still sizeof (*body) */
1646         if (opc == OST_WRITE) {
1647                 if (cli->cl_checksum &&
1648                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1649                         /* store cl_cksum_type in a local variable since
1650                          * it can be changed via lprocfs */
1651                         enum cksum_types cksum_type = cli->cl_cksum_type;
1652
1653                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1654                                 body->oa.o_flags = 0;
1655
1656                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1657                                                                 cksum_type);
1658                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1659
1660                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1661                                                   requested_nob, page_count,
1662                                                   pga, OST_WRITE,
1663                                                   &body->oa.o_cksum);
1664                         if (rc < 0) {
1665                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1666                                        rc);
1667                                 GOTO(out, rc);
1668                         }
1669                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1670                                body->oa.o_cksum);
1671
1672                         /* save this in 'oa', too, for later checking */
1673                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1674                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1675                                                            cksum_type);
1676                 } else {
1677                         /* clear out the checksum flag, in case this is a
1678                          * resend but cl_checksum is no longer set. b=11238 */
1679                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1680                 }
1681                 oa->o_cksum = body->oa.o_cksum;
1682                 /* 1 RC per niobuf */
1683                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1684                                      sizeof(__u32) * niocount);
1685         } else {
1686                 if (cli->cl_checksum &&
1687                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1688                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1689                                 body->oa.o_flags = 0;
1690                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1691                                 cli->cl_cksum_type);
1692                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1693                 }
1694
1695                 /* Client cksum has been already copied to wire obdo in previous
1696                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1697                  * resent due to cksum error, this will allow Server to
1698                  * check+dump pages on its side */
1699         }
1700         ptlrpc_request_set_replen(req);
1701
1702         aa = ptlrpc_req_async_args(aa, req);
1703         aa->aa_oa = oa;
1704         aa->aa_requested_nob = requested_nob;
1705         aa->aa_nio_count = niocount;
1706         aa->aa_page_count = page_count;
1707         aa->aa_resends = 0;
1708         aa->aa_ppga = pga;
1709         aa->aa_cli = cli;
1710         INIT_LIST_HEAD(&aa->aa_oaps);
1711
1712         *reqp = req;
1713         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1714         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1715                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1716                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1717         RETURN(0);
1718
1719  out:
1720         ptlrpc_req_finished(req);
1721         RETURN(rc);
1722 }
1723
1724 char dbgcksum_file_name[PATH_MAX];
1725
1726 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1727                                 struct brw_page **pga, __u32 server_cksum,
1728                                 __u32 client_cksum)
1729 {
1730         struct file *filp;
1731         int rc, i;
1732         unsigned int len;
1733         char *buf;
1734
1735         /* will only keep dump of pages on first error for the same range in
1736          * file/fid, not during the resends/retries. */
1737         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1738                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1739                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1740                   libcfs_debug_file_path_arr :
1741                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1742                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1743                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1744                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1745                  pga[0]->off,
1746                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1747                  client_cksum, server_cksum);
1748         filp = filp_open(dbgcksum_file_name,
1749                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1750         if (IS_ERR(filp)) {
1751                 rc = PTR_ERR(filp);
1752                 if (rc == -EEXIST)
1753                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1754                                "checksum error: rc = %d\n", dbgcksum_file_name,
1755                                rc);
1756                 else
1757                         CERROR("%s: can't open to dump pages with checksum "
1758                                "error: rc = %d\n", dbgcksum_file_name, rc);
1759                 return;
1760         }
1761
1762         for (i = 0; i < page_count; i++) {
1763                 len = pga[i]->count;
1764                 buf = kmap(pga[i]->pg);
1765                 while (len != 0) {
1766                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1767                         if (rc < 0) {
1768                                 CERROR("%s: wanted to write %u but got %d "
1769                                        "error\n", dbgcksum_file_name, len, rc);
1770                                 break;
1771                         }
1772                         len -= rc;
1773                         buf += rc;
1774                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1775                                dbgcksum_file_name, rc);
1776                 }
1777                 kunmap(pga[i]->pg);
1778         }
1779
1780         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1781         if (rc)
1782                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1783         filp_close(filp, NULL);
1784 }
1785
1786 static int
1787 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1788                      __u32 client_cksum, __u32 server_cksum,
1789                      struct osc_brw_async_args *aa)
1790 {
1791         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1792         enum cksum_types cksum_type;
1793         obd_dif_csum_fn *fn = NULL;
1794         int sector_size = 0;
1795         __u32 new_cksum;
1796         char *msg;
1797         int rc;
1798
1799         if (server_cksum == client_cksum) {
1800                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1801                 return 0;
1802         }
1803
1804         if (aa->aa_cli->cl_checksum_dump)
1805                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1806                                     server_cksum, client_cksum);
1807
1808         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1809                                            oa->o_flags : 0);
1810
1811         switch (cksum_type) {
1812         case OBD_CKSUM_T10IP512:
1813                 fn = obd_dif_ip_fn;
1814                 sector_size = 512;
1815                 break;
1816         case OBD_CKSUM_T10IP4K:
1817                 fn = obd_dif_ip_fn;
1818                 sector_size = 4096;
1819                 break;
1820         case OBD_CKSUM_T10CRC512:
1821                 fn = obd_dif_crc_fn;
1822                 sector_size = 512;
1823                 break;
1824         case OBD_CKSUM_T10CRC4K:
1825                 fn = obd_dif_crc_fn;
1826                 sector_size = 4096;
1827                 break;
1828         default:
1829                 break;
1830         }
1831
1832         if (fn)
1833                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1834                                              aa->aa_page_count, aa->aa_ppga,
1835                                              OST_WRITE, fn, sector_size,
1836                                              &new_cksum);
1837         else
1838                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1839                                        aa->aa_ppga, OST_WRITE, cksum_type,
1840                                        &new_cksum);
1841
1842         if (rc < 0)
1843                 msg = "failed to calculate the client write checksum";
1844         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1845                 msg = "the server did not use the checksum type specified in "
1846                       "the original request - likely a protocol problem";
1847         else if (new_cksum == server_cksum)
1848                 msg = "changed on the client after we checksummed it - "
1849                       "likely false positive due to mmap IO (bug 11742)";
1850         else if (new_cksum == client_cksum)
1851                 msg = "changed in transit before arrival at OST";
1852         else
1853                 msg = "changed in transit AND doesn't match the original - "
1854                       "likely false positive due to mmap IO (bug 11742)";
1855
1856         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1857                            DFID " object "DOSTID" extent [%llu-%llu], original "
1858                            "client csum %x (type %x), server csum %x (type %x),"
1859                            " client csum now %x\n",
1860                            obd_name, msg, libcfs_nid2str(peer->nid),
1861                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1862                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1863                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1864                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1865                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1866                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1867                            client_cksum,
1868                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1869                            server_cksum, cksum_type, new_cksum);
1870         return 1;
1871 }
1872
1873 /* Note rc enters this function as number of bytes transferred */
1874 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1875 {
1876         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1877         struct client_obd *cli = aa->aa_cli;
1878         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1879         const struct lnet_process_id *peer =
1880                 &req->rq_import->imp_connection->c_peer;
1881         struct ost_body *body;
1882         u32 client_cksum = 0;
1883         struct inode *inode;
1884
1885         ENTRY;
1886
1887         if (rc < 0 && rc != -EDQUOT) {
1888                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1889                 RETURN(rc);
1890         }
1891
1892         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1893         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1894         if (body == NULL) {
1895                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1896                 RETURN(-EPROTO);
1897         }
1898
1899         /* set/clear over quota flag for a uid/gid/projid */
1900         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1901             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1902                 unsigned qid[LL_MAXQUOTAS] = {
1903                                          body->oa.o_uid, body->oa.o_gid,
1904                                          body->oa.o_projid };
1905                 CDEBUG(D_QUOTA,
1906                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1907                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1908                        body->oa.o_valid, body->oa.o_flags);
1909                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1910                                        body->oa.o_flags);
1911         }
1912
1913         osc_update_grant(cli, body);
1914
1915         if (rc < 0)
1916                 RETURN(rc);
1917
1918         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1919                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1920
1921         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1922                 if (rc > 0) {
1923                         CERROR("%s: unexpected positive size %d\n",
1924                                obd_name, rc);
1925                         RETURN(-EPROTO);
1926                 }
1927
1928                 if (req->rq_bulk != NULL &&
1929                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1930                         RETURN(-EAGAIN);
1931
1932                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1933                     check_write_checksum(&body->oa, peer, client_cksum,
1934                                          body->oa.o_cksum, aa))
1935                         RETURN(-EAGAIN);
1936
1937                 rc = check_write_rcs(req, aa->aa_requested_nob,
1938                                      aa->aa_nio_count, aa->aa_page_count,
1939                                      aa->aa_ppga);
1940                 GOTO(out, rc);
1941         }
1942
1943         /* The rest of this function executes only for OST_READs */
1944
1945         if (req->rq_bulk == NULL) {
1946                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1947                                           RCL_SERVER);
1948                 LASSERT(rc == req->rq_status);
1949         } else {
1950                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1951                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1952         }
1953         if (rc < 0)
1954                 GOTO(out, rc = -EAGAIN);
1955
1956         if (rc > aa->aa_requested_nob) {
1957                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1958                        rc, aa->aa_requested_nob);
1959                 RETURN(-EPROTO);
1960         }
1961
1962         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1963                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1964                        rc, req->rq_bulk->bd_nob_transferred);
1965                 RETURN(-EPROTO);
1966         }
1967
1968         if (req->rq_bulk == NULL) {
1969                 /* short io */
1970                 int nob, pg_count, i = 0;
1971                 unsigned char *buf;
1972
1973                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1974                 pg_count = aa->aa_page_count;
1975                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1976                                                    rc);
1977                 nob = rc;
1978                 while (nob > 0 && pg_count > 0) {
1979                         unsigned char *ptr;
1980                         int count = aa->aa_ppga[i]->count > nob ?
1981                                     nob : aa->aa_ppga[i]->count;
1982
1983                         CDEBUG(D_CACHE, "page %p count %d\n",
1984                                aa->aa_ppga[i]->pg, count);
1985                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1986                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1987                                count);
1988                         kunmap_atomic((void *) ptr);
1989
1990                         buf += count;
1991                         nob -= count;
1992                         i++;
1993                         pg_count--;
1994                 }
1995         }
1996
1997         if (rc < aa->aa_requested_nob)
1998                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1999
2000         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2001                 static int cksum_counter;
2002                 u32        server_cksum = body->oa.o_cksum;
2003                 char      *via = "";
2004                 char      *router = "";
2005                 enum cksum_types cksum_type;
2006                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2007                         body->oa.o_flags : 0;
2008
2009                 cksum_type = obd_cksum_type_unpack(o_flags);
2010                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2011                                           aa->aa_page_count, aa->aa_ppga,
2012                                           OST_READ, &client_cksum);
2013                 if (rc < 0)
2014                         GOTO(out, rc);
2015
2016                 if (req->rq_bulk != NULL &&
2017                     peer->nid != req->rq_bulk->bd_sender) {
2018                         via = " via ";
2019                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2020                 }
2021
2022                 if (server_cksum != client_cksum) {
2023                         struct ost_body *clbody;
2024                         u32 page_count = aa->aa_page_count;
2025
2026                         clbody = req_capsule_client_get(&req->rq_pill,
2027                                                         &RMF_OST_BODY);
2028                         if (cli->cl_checksum_dump)
2029                                 dump_all_bulk_pages(&clbody->oa, page_count,
2030                                                     aa->aa_ppga, server_cksum,
2031                                                     client_cksum);
2032
2033                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2034                                            "%s%s%s inode "DFID" object "DOSTID
2035                                            " extent [%llu-%llu], client %x, "
2036                                            "server %x, cksum_type %x\n",
2037                                            obd_name,
2038                                            libcfs_nid2str(peer->nid),
2039                                            via, router,
2040                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2041                                                 clbody->oa.o_parent_seq : 0ULL,
2042                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2043                                                 clbody->oa.o_parent_oid : 0,
2044                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2045                                                 clbody->oa.o_parent_ver : 0,
2046                                            POSTID(&body->oa.o_oi),
2047                                            aa->aa_ppga[0]->off,
2048                                            aa->aa_ppga[page_count-1]->off +
2049                                            aa->aa_ppga[page_count-1]->count - 1,
2050                                            client_cksum, server_cksum,
2051                                            cksum_type);
2052                         cksum_counter = 0;
2053                         aa->aa_oa->o_cksum = client_cksum;
2054                         rc = -EAGAIN;
2055                 } else {
2056                         cksum_counter++;
2057                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2058                         rc = 0;
2059                 }
2060         } else if (unlikely(client_cksum)) {
2061                 static int cksum_missed;
2062
2063                 cksum_missed++;
2064                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2065                         CERROR("%s: checksum %u requested from %s but not sent\n",
2066                                obd_name, cksum_missed,
2067                                libcfs_nid2str(peer->nid));
2068         } else {
2069                 rc = 0;
2070         }
2071
2072         inode = page2inode(aa->aa_ppga[0]->pg);
2073         if (inode && IS_ENCRYPTED(inode)) {
2074                 int idx;
2075
2076                 if (!llcrypt_has_encryption_key(inode)) {
2077                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2078                         GOTO(out, rc);
2079                 }
2080                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2081                         struct brw_page *pg = aa->aa_ppga[idx];
2082                         unsigned int offs = 0;
2083
2084                         while (offs < PAGE_SIZE) {
2085                                 /* do not decrypt if page is all 0s */
2086                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2087                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2088                                         /* if page is empty forward info to
2089                                          * upper layers (ll_io_zero_page) by
2090                                          * clearing PagePrivate2
2091                                          */
2092                                         if (!offs)
2093                                                 ClearPagePrivate2(pg->pg);
2094                                         break;
2095                                 }
2096
2097                                 /* The page is already locked when we arrive here,
2098                                  * except when we deal with a twisted page for
2099                                  * specific Direct IO support, in which case
2100                                  * PageChecked flag is set on page.
2101                                  */
2102                                 if (PageChecked(pg->pg))
2103                                         lock_page(pg->pg);
2104                                 rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2105                                                     LUSTRE_ENCRYPTION_UNIT_SIZE,
2106                                                                       offs);
2107                                 if (PageChecked(pg->pg))
2108                                         unlock_page(pg->pg);
2109                                 if (rc)
2110                                         GOTO(out, rc);
2111
2112                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2113                         }
2114                 }
2115         }
2116
2117 out:
2118         if (rc >= 0)
2119                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2120                                      aa->aa_oa, &body->oa);
2121
2122         RETURN(rc);
2123 }
2124
2125 static int osc_brw_redo_request(struct ptlrpc_request *request,
2126                                 struct osc_brw_async_args *aa, int rc)
2127 {
2128         struct ptlrpc_request *new_req;
2129         struct osc_brw_async_args *new_aa;
2130         struct osc_async_page *oap;
2131         ENTRY;
2132
2133         /* The below message is checked in replay-ost-single.sh test_8ae*/
2134         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2135                   "redo for recoverable error %d", rc);
2136
2137         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2138                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2139                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2140                                   aa->aa_ppga, &new_req, 1);
2141         if (rc)
2142                 RETURN(rc);
2143
2144         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2145                 if (oap->oap_request != NULL) {
2146                         LASSERTF(request == oap->oap_request,
2147                                  "request %p != oap_request %p\n",
2148                                  request, oap->oap_request);
2149                 }
2150         }
2151         /*
2152          * New request takes over pga and oaps from old request.
2153          * Note that copying a list_head doesn't work, need to move it...
2154          */
2155         aa->aa_resends++;
2156         new_req->rq_interpret_reply = request->rq_interpret_reply;
2157         new_req->rq_async_args = request->rq_async_args;
2158         new_req->rq_commit_cb = request->rq_commit_cb;
2159         /* cap resend delay to the current request timeout, this is similar to
2160          * what ptlrpc does (see after_reply()) */
2161         if (aa->aa_resends > new_req->rq_timeout)
2162                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2163         else
2164                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2165         new_req->rq_generation_set = 1;
2166         new_req->rq_import_generation = request->rq_import_generation;
2167
2168         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2169
2170         INIT_LIST_HEAD(&new_aa->aa_oaps);
2171         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2172         INIT_LIST_HEAD(&new_aa->aa_exts);
2173         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2174         new_aa->aa_resends = aa->aa_resends;
2175
2176         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2177                 if (oap->oap_request) {
2178                         ptlrpc_req_finished(oap->oap_request);
2179                         oap->oap_request = ptlrpc_request_addref(new_req);
2180                 }
2181         }
2182
2183         /* XXX: This code will run into problem if we're going to support
2184          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2185          * and wait for all of them to be finished. We should inherit request
2186          * set from old request. */
2187         ptlrpcd_add_req(new_req);
2188
2189         DEBUG_REQ(D_INFO, new_req, "new request");
2190         RETURN(0);
2191 }
2192
2193 /*
2194  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2195  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2196  * fine for our small page arrays and doesn't require allocation.  its an
2197  * insertion sort that swaps elements that are strides apart, shrinking the
2198  * stride down until its '1' and the array is sorted.
2199  */
2200 static void sort_brw_pages(struct brw_page **array, int num)
2201 {
2202         int stride, i, j;
2203         struct brw_page *tmp;
2204
2205         if (num == 1)
2206                 return;
2207         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2208                 ;
2209
2210         do {
2211                 stride /= 3;
2212                 for (i = stride ; i < num ; i++) {
2213                         tmp = array[i];
2214                         j = i;
2215                         while (j >= stride && array[j - stride]->off > tmp->off) {
2216                                 array[j] = array[j - stride];
2217                                 j -= stride;
2218                         }
2219                         array[j] = tmp;
2220                 }
2221         } while (stride > 1);
2222 }
2223
2224 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2225 {
2226         LASSERT(ppga != NULL);
2227         OBD_FREE_PTR_ARRAY(ppga, count);
2228 }
2229
2230 static int brw_interpret(const struct lu_env *env,
2231                          struct ptlrpc_request *req, void *args, int rc)
2232 {
2233         struct osc_brw_async_args *aa = args;
2234         struct osc_extent *ext;
2235         struct osc_extent *tmp;
2236         struct client_obd *cli = aa->aa_cli;
2237         unsigned long transferred = 0;
2238
2239         ENTRY;
2240
2241         rc = osc_brw_fini_request(req, rc);
2242         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2243
2244         /* restore clear text pages */
2245         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2246
2247         /*
2248          * When server returns -EINPROGRESS, client should always retry
2249          * regardless of the number of times the bulk was resent already.
2250          */
2251         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2252                 if (req->rq_import_generation !=
2253                     req->rq_import->imp_generation) {
2254                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2255                                ""DOSTID", rc = %d.\n",
2256                                req->rq_import->imp_obd->obd_name,
2257                                POSTID(&aa->aa_oa->o_oi), rc);
2258                 } else if (rc == -EINPROGRESS ||
2259                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2260                         rc = osc_brw_redo_request(req, aa, rc);
2261                 } else {
2262                         CERROR("%s: too many resent retries for object: "
2263                                "%llu:%llu, rc = %d.\n",
2264                                req->rq_import->imp_obd->obd_name,
2265                                POSTID(&aa->aa_oa->o_oi), rc);
2266                 }
2267
2268                 if (rc == 0)
2269                         RETURN(0);
2270                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2271                         rc = -EIO;
2272         }
2273
2274         if (rc == 0) {
2275                 struct obdo *oa = aa->aa_oa;
2276                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2277                 unsigned long valid = 0;
2278                 struct cl_object *obj;
2279                 struct osc_async_page *last;
2280
2281                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2282                 obj = osc2cl(last->oap_obj);
2283
2284                 cl_object_attr_lock(obj);
2285                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2286                         attr->cat_blocks = oa->o_blocks;
2287                         valid |= CAT_BLOCKS;
2288                 }
2289                 if (oa->o_valid & OBD_MD_FLMTIME) {
2290                         attr->cat_mtime = oa->o_mtime;
2291                         valid |= CAT_MTIME;
2292                 }
2293                 if (oa->o_valid & OBD_MD_FLATIME) {
2294                         attr->cat_atime = oa->o_atime;
2295                         valid |= CAT_ATIME;
2296                 }
2297                 if (oa->o_valid & OBD_MD_FLCTIME) {
2298                         attr->cat_ctime = oa->o_ctime;
2299                         valid |= CAT_CTIME;
2300                 }
2301
2302                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2303                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2304                         loff_t last_off = last->oap_count + last->oap_obj_off +
2305                                 last->oap_page_off;
2306
2307                         /* Change file size if this is an out of quota or
2308                          * direct IO write and it extends the file size */
2309                         if (loi->loi_lvb.lvb_size < last_off) {
2310                                 attr->cat_size = last_off;
2311                                 valid |= CAT_SIZE;
2312                         }
2313                         /* Extend KMS if it's not a lockless write */
2314                         if (loi->loi_kms < last_off &&
2315                             oap2osc_page(last)->ops_srvlock == 0) {
2316                                 attr->cat_kms = last_off;
2317                                 valid |= CAT_KMS;
2318                         }
2319                 }
2320
2321                 if (valid != 0)
2322                         cl_object_attr_update(env, obj, attr, valid);
2323                 cl_object_attr_unlock(obj);
2324         }
2325         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2326         aa->aa_oa = NULL;
2327
2328         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2329                 osc_inc_unstable_pages(req);
2330
2331         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2332                 list_del_init(&ext->oe_link);
2333                 osc_extent_finish(env, ext, 1,
2334                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2335         }
2336         LASSERT(list_empty(&aa->aa_exts));
2337         LASSERT(list_empty(&aa->aa_oaps));
2338
2339         transferred = (req->rq_bulk == NULL ? /* short io */
2340                        aa->aa_requested_nob :
2341                        req->rq_bulk->bd_nob_transferred);
2342
2343         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2344         ptlrpc_lprocfs_brw(req, transferred);
2345
2346         spin_lock(&cli->cl_loi_list_lock);
2347         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2348          * is called so we know whether to go to sync BRWs or wait for more
2349          * RPCs to complete */
2350         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2351                 cli->cl_w_in_flight--;
2352         else
2353                 cli->cl_r_in_flight--;
2354         osc_wake_cache_waiters(cli);
2355         spin_unlock(&cli->cl_loi_list_lock);
2356
2357         osc_io_unplug(env, cli, NULL);
2358         RETURN(rc);
2359 }
2360
2361 static void brw_commit(struct ptlrpc_request *req)
2362 {
2363         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2364          * this called via the rq_commit_cb, I need to ensure
2365          * osc_dec_unstable_pages is still called. Otherwise unstable
2366          * pages may be leaked. */
2367         spin_lock(&req->rq_lock);
2368         if (likely(req->rq_unstable)) {
2369                 req->rq_unstable = 0;
2370                 spin_unlock(&req->rq_lock);
2371
2372                 osc_dec_unstable_pages(req);
2373         } else {
2374                 req->rq_committed = 1;
2375                 spin_unlock(&req->rq_lock);
2376         }
2377 }
2378
2379 /**
2380  * Build an RPC by the list of extent @ext_list. The caller must ensure
2381  * that the total pages in this list are NOT over max pages per RPC.
2382  * Extents in the list must be in OES_RPC state.
2383  */
2384 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2385                   struct list_head *ext_list, int cmd)
2386 {
2387         struct ptlrpc_request           *req = NULL;
2388         struct osc_extent               *ext;
2389         struct brw_page                 **pga = NULL;
2390         struct osc_brw_async_args       *aa = NULL;
2391         struct obdo                     *oa = NULL;
2392         struct osc_async_page           *oap;
2393         struct osc_object               *obj = NULL;
2394         struct cl_req_attr              *crattr = NULL;
2395         loff_t                          starting_offset = OBD_OBJECT_EOF;
2396         loff_t                          ending_offset = 0;
2397         /* '1' for consistency with code that checks !mpflag to restore */
2398         int mpflag = 1;
2399         int                             mem_tight = 0;
2400         int                             page_count = 0;
2401         bool                            soft_sync = false;
2402         bool                            ndelay = false;
2403         int                             i;
2404         int                             grant = 0;
2405         int                             rc;
2406         __u32                           layout_version = 0;
2407         LIST_HEAD(rpc_list);
2408         struct ost_body                 *body;
2409         ENTRY;
2410         LASSERT(!list_empty(ext_list));
2411
2412         /* add pages into rpc_list to build BRW rpc */
2413         list_for_each_entry(ext, ext_list, oe_link) {
2414                 LASSERT(ext->oe_state == OES_RPC);
2415                 mem_tight |= ext->oe_memalloc;
2416                 grant += ext->oe_grants;
2417                 page_count += ext->oe_nr_pages;
2418                 layout_version = max(layout_version, ext->oe_layout_version);
2419                 if (obj == NULL)
2420                         obj = ext->oe_obj;
2421         }
2422
2423         soft_sync = osc_over_unstable_soft_limit(cli);
2424         if (mem_tight)
2425                 mpflag = memalloc_noreclaim_save();
2426
2427         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2428         if (pga == NULL)
2429                 GOTO(out, rc = -ENOMEM);
2430
2431         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2432         if (oa == NULL)
2433                 GOTO(out, rc = -ENOMEM);
2434
2435         i = 0;
2436         list_for_each_entry(ext, ext_list, oe_link) {
2437                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2438                         if (mem_tight)
2439                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2440                         if (soft_sync)
2441                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2442                         pga[i] = &oap->oap_brw_page;
2443                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2444                         i++;
2445
2446                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2447                         if (starting_offset == OBD_OBJECT_EOF ||
2448                             starting_offset > oap->oap_obj_off)
2449                                 starting_offset = oap->oap_obj_off;
2450                         else
2451                                 LASSERT(oap->oap_page_off == 0);
2452                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2453                                 ending_offset = oap->oap_obj_off +
2454                                                 oap->oap_count;
2455                         else
2456                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2457                                         PAGE_SIZE);
2458                 }
2459                 if (ext->oe_ndelay)
2460                         ndelay = true;
2461         }
2462
2463         /* first page in the list */
2464         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2465
2466         crattr = &osc_env_info(env)->oti_req_attr;
2467         memset(crattr, 0, sizeof(*crattr));
2468         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2469         crattr->cra_flags = ~0ULL;
2470         crattr->cra_page = oap2cl_page(oap);
2471         crattr->cra_oa = oa;
2472         cl_req_attr_set(env, osc2cl(obj), crattr);
2473
2474         if (cmd == OBD_BRW_WRITE) {
2475                 oa->o_grant_used = grant;
2476                 if (layout_version > 0) {
2477                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2478                                PFID(&oa->o_oi.oi_fid), layout_version);
2479
2480                         oa->o_layout_version = layout_version;
2481                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2482                 }
2483         }
2484
2485         sort_brw_pages(pga, page_count);
2486         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2487         if (rc != 0) {
2488                 CERROR("prep_req failed: %d\n", rc);
2489                 GOTO(out, rc);
2490         }
2491
2492         req->rq_commit_cb = brw_commit;
2493         req->rq_interpret_reply = brw_interpret;
2494         req->rq_memalloc = mem_tight != 0;
2495         oap->oap_request = ptlrpc_request_addref(req);
2496         if (ndelay) {
2497                 req->rq_no_resend = req->rq_no_delay = 1;
2498                 /* probably set a shorter timeout value.
2499                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2500                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2501         }
2502
2503         /* Need to update the timestamps after the request is built in case
2504          * we race with setattr (locally or in queue at OST).  If OST gets
2505          * later setattr before earlier BRW (as determined by the request xid),
2506          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2507          * way to do this in a single call.  bug 10150 */
2508         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2509         crattr->cra_oa = &body->oa;
2510         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2511         cl_req_attr_set(env, osc2cl(obj), crattr);
2512         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2513
2514         aa = ptlrpc_req_async_args(aa, req);
2515         INIT_LIST_HEAD(&aa->aa_oaps);
2516         list_splice_init(&rpc_list, &aa->aa_oaps);
2517         INIT_LIST_HEAD(&aa->aa_exts);
2518         list_splice_init(ext_list, &aa->aa_exts);
2519
2520         spin_lock(&cli->cl_loi_list_lock);
2521         starting_offset >>= PAGE_SHIFT;
2522         if (cmd == OBD_BRW_READ) {
2523                 cli->cl_r_in_flight++;
2524                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2525                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2526                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2527                                       starting_offset + 1);
2528         } else {
2529                 cli->cl_w_in_flight++;
2530                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2531                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2532                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2533                                       starting_offset + 1);
2534         }
2535         spin_unlock(&cli->cl_loi_list_lock);
2536
2537         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2538                   page_count, aa, cli->cl_r_in_flight,
2539                   cli->cl_w_in_flight);
2540         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2541
2542         ptlrpcd_add_req(req);
2543         rc = 0;
2544         EXIT;
2545
2546 out:
2547         if (mem_tight)
2548                 memalloc_noreclaim_restore(mpflag);
2549
2550         if (rc != 0) {
2551                 LASSERT(req == NULL);
2552
2553                 if (oa)
2554                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2555                 if (pga) {
2556                         osc_release_bounce_pages(pga, page_count);
2557                         osc_release_ppga(pga, page_count);
2558                 }
2559                 /* this should happen rarely and is pretty bad, it makes the
2560                  * pending list not follow the dirty order */
2561                 while (!list_empty(ext_list)) {
2562                         ext = list_entry(ext_list->next, struct osc_extent,
2563                                          oe_link);
2564                         list_del_init(&ext->oe_link);
2565                         osc_extent_finish(env, ext, 0, rc);
2566                 }
2567         }
2568         RETURN(rc);
2569 }
2570
2571 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2572 {
2573         int set = 0;
2574
2575         LASSERT(lock != NULL);
2576
2577         lock_res_and_lock(lock);
2578
2579         if (lock->l_ast_data == NULL)
2580                 lock->l_ast_data = data;
2581         if (lock->l_ast_data == data)
2582                 set = 1;
2583
2584         unlock_res_and_lock(lock);
2585
2586         return set;
2587 }
2588
2589 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2590                      void *cookie, struct lustre_handle *lockh,
2591                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2592                      int errcode)
2593 {
2594         bool intent = *flags & LDLM_FL_HAS_INTENT;
2595         int rc;
2596         ENTRY;
2597
2598         /* The request was created before ldlm_cli_enqueue call. */
2599         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2600                 struct ldlm_reply *rep;
2601
2602                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2603                 LASSERT(rep != NULL);
2604
2605                 rep->lock_policy_res1 =
2606                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2607                 if (rep->lock_policy_res1)
2608                         errcode = rep->lock_policy_res1;
2609                 if (!speculative)
2610                         *flags |= LDLM_FL_LVB_READY;
2611         } else if (errcode == ELDLM_OK) {
2612                 *flags |= LDLM_FL_LVB_READY;
2613         }
2614
2615         /* Call the update callback. */
2616         rc = (*upcall)(cookie, lockh, errcode);
2617
2618         /* release the reference taken in ldlm_cli_enqueue() */
2619         if (errcode == ELDLM_LOCK_MATCHED)
2620                 errcode = ELDLM_OK;
2621         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2622                 ldlm_lock_decref(lockh, mode);
2623
2624         RETURN(rc);
2625 }
2626
2627 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2628                           void *args, int rc)
2629 {
2630         struct osc_enqueue_args *aa = args;
2631         struct ldlm_lock *lock;
2632         struct lustre_handle *lockh = &aa->oa_lockh;
2633         enum ldlm_mode mode = aa->oa_mode;
2634         struct ost_lvb *lvb = aa->oa_lvb;
2635         __u32 lvb_len = sizeof(*lvb);
2636         __u64 flags = 0;
2637
2638         ENTRY;
2639
2640         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2641          * be valid. */
2642         lock = ldlm_handle2lock(lockh);
2643         LASSERTF(lock != NULL,
2644                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2645                  lockh->cookie, req, aa);
2646
2647         /* Take an additional reference so that a blocking AST that
2648          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2649          * to arrive after an upcall has been executed by
2650          * osc_enqueue_fini(). */
2651         ldlm_lock_addref(lockh, mode);
2652
2653         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2654         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2655
2656         /* Let CP AST to grant the lock first. */
2657         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2658
2659         if (aa->oa_speculative) {
2660                 LASSERT(aa->oa_lvb == NULL);
2661                 LASSERT(aa->oa_flags == NULL);
2662                 aa->oa_flags = &flags;
2663         }
2664
2665         /* Complete obtaining the lock procedure. */
2666         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2667                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2668                                    lockh, rc);
2669         /* Complete osc stuff. */
2670         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2671                               aa->oa_flags, aa->oa_speculative, rc);
2672
2673         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2674
2675         ldlm_lock_decref(lockh, mode);
2676         LDLM_LOCK_PUT(lock);
2677         RETURN(rc);
2678 }
2679
2680 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2681  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2682  * other synchronous requests, however keeping some locks and trying to obtain
2683  * others may take a considerable amount of time in a case of ost failure; and
2684  * when other sync requests do not get released lock from a client, the client
2685  * is evicted from the cluster -- such scenarious make the life difficult, so
2686  * release locks just after they are obtained. */
2687 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2688                      __u64 *flags, union ldlm_policy_data *policy,
2689                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2690                      void *cookie, struct ldlm_enqueue_info *einfo,
2691                      struct ptlrpc_request_set *rqset, int async,
2692                      bool speculative)
2693 {
2694         struct obd_device *obd = exp->exp_obd;
2695         struct lustre_handle lockh = { 0 };
2696         struct ptlrpc_request *req = NULL;
2697         int intent = *flags & LDLM_FL_HAS_INTENT;
2698         __u64 match_flags = *flags;
2699         enum ldlm_mode mode;
2700         int rc;
2701         ENTRY;
2702
2703         /* Filesystem lock extents are extended to page boundaries so that
2704          * dealing with the page cache is a little smoother.  */
2705         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2706         policy->l_extent.end |= ~PAGE_MASK;
2707
2708         /* Next, search for already existing extent locks that will cover us */
2709         /* If we're trying to read, we also search for an existing PW lock.  The
2710          * VFS and page cache already protect us locally, so lots of readers/
2711          * writers can share a single PW lock.
2712          *
2713          * There are problems with conversion deadlocks, so instead of
2714          * converting a read lock to a write lock, we'll just enqueue a new
2715          * one.
2716          *
2717          * At some point we should cancel the read lock instead of making them
2718          * send us a blocking callback, but there are problems with canceling
2719          * locks out from other users right now, too. */
2720         mode = einfo->ei_mode;
2721         if (einfo->ei_mode == LCK_PR)
2722                 mode |= LCK_PW;
2723         /* Normal lock requests must wait for the LVB to be ready before
2724          * matching a lock; speculative lock requests do not need to,
2725          * because they will not actually use the lock. */
2726         if (!speculative)
2727                 match_flags |= LDLM_FL_LVB_READY;
2728         if (intent != 0)
2729                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2730         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2731                                einfo->ei_type, policy, mode, &lockh, 0);
2732         if (mode) {
2733                 struct ldlm_lock *matched;
2734
2735                 if (*flags & LDLM_FL_TEST_LOCK)
2736                         RETURN(ELDLM_OK);
2737
2738                 matched = ldlm_handle2lock(&lockh);
2739                 if (speculative) {
2740                         /* This DLM lock request is speculative, and does not
2741                          * have an associated IO request. Therefore if there
2742                          * is already a DLM lock, it wll just inform the
2743                          * caller to cancel the request for this stripe.*/
2744                         lock_res_and_lock(matched);
2745                         if (ldlm_extent_equal(&policy->l_extent,
2746                             &matched->l_policy_data.l_extent))
2747                                 rc = -EEXIST;
2748                         else
2749                                 rc = -ECANCELED;
2750                         unlock_res_and_lock(matched);
2751
2752                         ldlm_lock_decref(&lockh, mode);
2753                         LDLM_LOCK_PUT(matched);
2754                         RETURN(rc);
2755                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2756                         *flags |= LDLM_FL_LVB_READY;
2757
2758                         /* We already have a lock, and it's referenced. */
2759                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2760
2761                         ldlm_lock_decref(&lockh, mode);
2762                         LDLM_LOCK_PUT(matched);
2763                         RETURN(ELDLM_OK);
2764                 } else {
2765                         ldlm_lock_decref(&lockh, mode);
2766                         LDLM_LOCK_PUT(matched);
2767                 }
2768         }
2769
2770         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2771                 RETURN(-ENOLCK);
2772
2773         if (intent) {
2774                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2775                                            &RQF_LDLM_ENQUEUE_LVB);
2776                 if (req == NULL)
2777                         RETURN(-ENOMEM);
2778
2779                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2780                 if (rc) {
2781                         ptlrpc_request_free(req);
2782                         RETURN(rc);
2783                 }
2784
2785                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2786                                      sizeof *lvb);
2787                 ptlrpc_request_set_replen(req);
2788         }
2789
2790         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2791         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2792
2793         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2794                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2795         if (async) {
2796                 if (!rc) {
2797                         struct osc_enqueue_args *aa;
2798                         aa = ptlrpc_req_async_args(aa, req);
2799                         aa->oa_exp         = exp;
2800                         aa->oa_mode        = einfo->ei_mode;
2801                         aa->oa_type        = einfo->ei_type;
2802                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2803                         aa->oa_upcall      = upcall;
2804                         aa->oa_cookie      = cookie;
2805                         aa->oa_speculative = speculative;
2806                         if (!speculative) {
2807                                 aa->oa_flags  = flags;
2808                                 aa->oa_lvb    = lvb;
2809                         } else {
2810                                 /* speculative locks are essentially to enqueue
2811                                  * a DLM lock  in advance, so we don't care
2812                                  * about the result of the enqueue. */
2813                                 aa->oa_lvb    = NULL;
2814                                 aa->oa_flags  = NULL;
2815                         }
2816
2817                         req->rq_interpret_reply = osc_enqueue_interpret;
2818                         ptlrpc_set_add_req(rqset, req);
2819                 } else if (intent) {
2820                         ptlrpc_req_finished(req);
2821                 }
2822                 RETURN(rc);
2823         }
2824
2825         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2826                               flags, speculative, rc);
2827         if (intent)
2828                 ptlrpc_req_finished(req);
2829
2830         RETURN(rc);
2831 }
2832
2833 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2834                    struct ldlm_res_id *res_id, enum ldlm_type type,
2835                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2836                    __u64 *flags, struct osc_object *obj,
2837                    struct lustre_handle *lockh, int unref)
2838 {
2839         struct obd_device *obd = exp->exp_obd;
2840         __u64 lflags = *flags;
2841         enum ldlm_mode rc;
2842         ENTRY;
2843
2844         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2845                 RETURN(-EIO);
2846
2847         /* Filesystem lock extents are extended to page boundaries so that
2848          * dealing with the page cache is a little smoother */
2849         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2850         policy->l_extent.end |= ~PAGE_MASK;
2851
2852         /* Next, search for already existing extent locks that will cover us */
2853         /* If we're trying to read, we also search for an existing PW lock.  The
2854          * VFS and page cache already protect us locally, so lots of readers/
2855          * writers can share a single PW lock. */
2856         rc = mode;
2857         if (mode == LCK_PR)
2858                 rc |= LCK_PW;
2859         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2860                              res_id, type, policy, rc, lockh, unref);
2861         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2862                 RETURN(rc);
2863
2864         if (obj != NULL) {
2865                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2866
2867                 LASSERT(lock != NULL);
2868                 if (osc_set_lock_data(lock, obj)) {
2869                         lock_res_and_lock(lock);
2870                         if (!ldlm_is_lvb_cached(lock)) {
2871                                 LASSERT(lock->l_ast_data == obj);
2872                                 osc_lock_lvb_update(env, obj, lock, NULL);
2873                                 ldlm_set_lvb_cached(lock);
2874                         }
2875                         unlock_res_and_lock(lock);
2876                 } else {
2877                         ldlm_lock_decref(lockh, rc);
2878                         rc = 0;
2879                 }
2880                 LDLM_LOCK_PUT(lock);
2881         }
2882         RETURN(rc);
2883 }
2884
2885 static int osc_statfs_interpret(const struct lu_env *env,
2886                                 struct ptlrpc_request *req, void *args, int rc)
2887 {
2888         struct osc_async_args *aa = args;
2889         struct obd_statfs *msfs;
2890
2891         ENTRY;
2892         if (rc == -EBADR)
2893                 /*
2894                  * The request has in fact never been sent due to issues at
2895                  * a higher level (LOV).  Exit immediately since the caller
2896                  * is aware of the problem and takes care of the clean up.
2897                  */
2898                 RETURN(rc);
2899
2900         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2901             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2902                 GOTO(out, rc = 0);
2903
2904         if (rc != 0)
2905                 GOTO(out, rc);
2906
2907         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2908         if (msfs == NULL)
2909                 GOTO(out, rc = -EPROTO);
2910
2911         *aa->aa_oi->oi_osfs = *msfs;
2912 out:
2913         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2914
2915         RETURN(rc);
2916 }
2917
2918 static int osc_statfs_async(struct obd_export *exp,
2919                             struct obd_info *oinfo, time64_t max_age,
2920                             struct ptlrpc_request_set *rqset)
2921 {
2922         struct obd_device     *obd = class_exp2obd(exp);
2923         struct ptlrpc_request *req;
2924         struct osc_async_args *aa;
2925         int rc;
2926         ENTRY;
2927
2928         if (obd->obd_osfs_age >= max_age) {
2929                 CDEBUG(D_SUPER,
2930                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2931                        obd->obd_name, &obd->obd_osfs,
2932                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2933                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2934                 spin_lock(&obd->obd_osfs_lock);
2935                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2936                 spin_unlock(&obd->obd_osfs_lock);
2937                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2938                 if (oinfo->oi_cb_up)
2939                         oinfo->oi_cb_up(oinfo, 0);
2940
2941                 RETURN(0);
2942         }
2943
2944         /* We could possibly pass max_age in the request (as an absolute
2945          * timestamp or a "seconds.usec ago") so the target can avoid doing
2946          * extra calls into the filesystem if that isn't necessary (e.g.
2947          * during mount that would help a bit).  Having relative timestamps
2948          * is not so great if request processing is slow, while absolute
2949          * timestamps are not ideal because they need time synchronization. */
2950         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2951         if (req == NULL)
2952                 RETURN(-ENOMEM);
2953
2954         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2955         if (rc) {
2956                 ptlrpc_request_free(req);
2957                 RETURN(rc);
2958         }
2959         ptlrpc_request_set_replen(req);
2960         req->rq_request_portal = OST_CREATE_PORTAL;
2961         ptlrpc_at_set_req_timeout(req);
2962
2963         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2964                 /* procfs requests not want stat in wait for avoid deadlock */
2965                 req->rq_no_resend = 1;
2966                 req->rq_no_delay = 1;
2967         }
2968
2969         req->rq_interpret_reply = osc_statfs_interpret;
2970         aa = ptlrpc_req_async_args(aa, req);
2971         aa->aa_oi = oinfo;
2972
2973         ptlrpc_set_add_req(rqset, req);
2974         RETURN(0);
2975 }
2976
2977 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2978                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2979 {
2980         struct obd_device     *obd = class_exp2obd(exp);
2981         struct obd_statfs     *msfs;
2982         struct ptlrpc_request *req;
2983         struct obd_import     *imp = NULL;
2984         int rc;
2985         ENTRY;
2986
2987
2988         /*Since the request might also come from lprocfs, so we need
2989          *sync this with client_disconnect_export Bug15684*/
2990         down_read(&obd->u.cli.cl_sem);
2991         if (obd->u.cli.cl_import)
2992                 imp = class_import_get(obd->u.cli.cl_import);
2993         up_read(&obd->u.cli.cl_sem);
2994         if (!imp)
2995                 RETURN(-ENODEV);
2996
2997         /* We could possibly pass max_age in the request (as an absolute
2998          * timestamp or a "seconds.usec ago") so the target can avoid doing
2999          * extra calls into the filesystem if that isn't necessary (e.g.
3000          * during mount that would help a bit).  Having relative timestamps
3001          * is not so great if request processing is slow, while absolute
3002          * timestamps are not ideal because they need time synchronization. */
3003         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3004
3005         class_import_put(imp);
3006
3007         if (req == NULL)
3008                 RETURN(-ENOMEM);
3009
3010         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3011         if (rc) {
3012                 ptlrpc_request_free(req);
3013                 RETURN(rc);
3014         }
3015         ptlrpc_request_set_replen(req);
3016         req->rq_request_portal = OST_CREATE_PORTAL;
3017         ptlrpc_at_set_req_timeout(req);
3018
3019         if (flags & OBD_STATFS_NODELAY) {
3020                 /* procfs requests not want stat in wait for avoid deadlock */
3021                 req->rq_no_resend = 1;
3022                 req->rq_no_delay = 1;
3023         }
3024
3025         rc = ptlrpc_queue_wait(req);
3026         if (rc)
3027                 GOTO(out, rc);
3028
3029         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3030         if (msfs == NULL)
3031                 GOTO(out, rc = -EPROTO);
3032
3033         *osfs = *msfs;
3034
3035         EXIT;
3036 out:
3037         ptlrpc_req_finished(req);
3038         return rc;
3039 }
3040
3041 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3042                          void *karg, void __user *uarg)
3043 {
3044         struct obd_device *obd = exp->exp_obd;
3045         struct obd_ioctl_data *data = karg;
3046         int rc = 0;
3047
3048         ENTRY;
3049         if (!try_module_get(THIS_MODULE)) {
3050                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3051                        module_name(THIS_MODULE));
3052                 return -EINVAL;
3053         }
3054         switch (cmd) {
3055         case OBD_IOC_CLIENT_RECOVER:
3056                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3057                                            data->ioc_inlbuf1, 0);
3058                 if (rc > 0)
3059                         rc = 0;
3060                 break;
3061         case IOC_OSC_SET_ACTIVE:
3062                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3063                                               data->ioc_offset);
3064                 break;
3065         default:
3066                 rc = -ENOTTY;
3067                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3068                        obd->obd_name, cmd, current->comm, rc);
3069                 break;
3070         }
3071
3072         module_put(THIS_MODULE);
3073         return rc;
3074 }
3075
3076 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3077                        u32 keylen, void *key, u32 vallen, void *val,
3078                        struct ptlrpc_request_set *set)
3079 {
3080         struct ptlrpc_request *req;
3081         struct obd_device     *obd = exp->exp_obd;
3082         struct obd_import     *imp = class_exp2cliimp(exp);
3083         char                  *tmp;
3084         int                    rc;
3085         ENTRY;
3086
3087         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3088
3089         if (KEY_IS(KEY_CHECKSUM)) {
3090                 if (vallen != sizeof(int))
3091                         RETURN(-EINVAL);
3092                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3093                 RETURN(0);
3094         }
3095
3096         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3097                 sptlrpc_conf_client_adapt(obd);
3098                 RETURN(0);
3099         }
3100
3101         if (KEY_IS(KEY_FLUSH_CTX)) {
3102                 sptlrpc_import_flush_my_ctx(imp);
3103                 RETURN(0);
3104         }
3105
3106         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3107                 struct client_obd *cli = &obd->u.cli;
3108                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3109                 long target = *(long *)val;
3110
3111                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3112                 *(long *)val -= nr;
3113                 RETURN(0);
3114         }
3115
3116         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3117                 RETURN(-EINVAL);
3118
3119         /* We pass all other commands directly to OST. Since nobody calls osc
3120            methods directly and everybody is supposed to go through LOV, we
3121            assume lov checked invalid values for us.
3122            The only recognised values so far are evict_by_nid and mds_conn.
3123            Even if something bad goes through, we'd get a -EINVAL from OST
3124            anyway. */
3125
3126         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3127                                                 &RQF_OST_SET_GRANT_INFO :
3128                                                 &RQF_OBD_SET_INFO);
3129         if (req == NULL)
3130                 RETURN(-ENOMEM);
3131
3132         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3133                              RCL_CLIENT, keylen);
3134         if (!KEY_IS(KEY_GRANT_SHRINK))
3135                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3136                                      RCL_CLIENT, vallen);
3137         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3138         if (rc) {
3139                 ptlrpc_request_free(req);
3140                 RETURN(rc);
3141         }
3142
3143         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3144         memcpy(tmp, key, keylen);
3145         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3146                                                         &RMF_OST_BODY :
3147                                                         &RMF_SETINFO_VAL);
3148         memcpy(tmp, val, vallen);
3149
3150         if (KEY_IS(KEY_GRANT_SHRINK)) {
3151                 struct osc_grant_args *aa;
3152                 struct obdo *oa;
3153
3154                 aa = ptlrpc_req_async_args(aa, req);
3155                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3156                 if (!oa) {
3157                         ptlrpc_req_finished(req);
3158                         RETURN(-ENOMEM);
3159                 }
3160                 *oa = ((struct ost_body *)val)->oa;
3161                 aa->aa_oa = oa;
3162                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3163         }
3164
3165         ptlrpc_request_set_replen(req);
3166         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3167                 LASSERT(set != NULL);
3168                 ptlrpc_set_add_req(set, req);
3169                 ptlrpc_check_set(NULL, set);
3170         } else {
3171                 ptlrpcd_add_req(req);
3172         }
3173
3174         RETURN(0);
3175 }
3176 EXPORT_SYMBOL(osc_set_info_async);
3177
3178 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3179                   struct obd_device *obd, struct obd_uuid *cluuid,
3180                   struct obd_connect_data *data, void *localdata)
3181 {
3182         struct client_obd *cli = &obd->u.cli;
3183
3184         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3185                 long lost_grant;
3186                 long grant;
3187
3188                 spin_lock(&cli->cl_loi_list_lock);
3189                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3190                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3191                         /* restore ocd_grant_blkbits as client page bits */
3192                         data->ocd_grant_blkbits = PAGE_SHIFT;
3193                         grant += cli->cl_dirty_grant;
3194                 } else {
3195                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3196                 }
3197                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3198                 lost_grant = cli->cl_lost_grant;
3199                 cli->cl_lost_grant = 0;
3200                 spin_unlock(&cli->cl_loi_list_lock);
3201
3202                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3203                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3204                        data->ocd_version, data->ocd_grant, lost_grant);
3205         }
3206
3207         RETURN(0);
3208 }
3209 EXPORT_SYMBOL(osc_reconnect);
3210
3211 int osc_disconnect(struct obd_export *exp)
3212 {
3213         struct obd_device *obd = class_exp2obd(exp);
3214         int rc;
3215
3216         rc = client_disconnect_export(exp);
3217         /**
3218          * Initially we put del_shrink_grant before disconnect_export, but it
3219          * causes the following problem if setup (connect) and cleanup
3220          * (disconnect) are tangled together.
3221          *      connect p1                     disconnect p2
3222          *   ptlrpc_connect_import
3223          *     ...............               class_manual_cleanup
3224          *                                     osc_disconnect
3225          *                                     del_shrink_grant
3226          *   ptlrpc_connect_interrupt
3227          *     osc_init_grant
3228          *   add this client to shrink list
3229          *                                      cleanup_osc
3230          * Bang! grant shrink thread trigger the shrink. BUG18662
3231          */
3232         osc_del_grant_list(&obd->u.cli);
3233         return rc;
3234 }
3235 EXPORT_SYMBOL(osc_disconnect);
3236
3237 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3238                                  struct hlist_node *hnode, void *arg)
3239 {
3240         struct lu_env *env = arg;
3241         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3242         struct ldlm_lock *lock;
3243         struct osc_object *osc = NULL;
3244         ENTRY;
3245
3246         lock_res(res);
3247         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3248                 if (lock->l_ast_data != NULL && osc == NULL) {
3249                         osc = lock->l_ast_data;
3250                         cl_object_get(osc2cl(osc));
3251                 }
3252
3253                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3254                  * by the 2nd round of ldlm_namespace_clean() call in
3255                  * osc_import_event(). */
3256                 ldlm_clear_cleaned(lock);
3257         }
3258         unlock_res(res);
3259
3260         if (osc != NULL) {
3261                 osc_object_invalidate(env, osc);
3262                 cl_object_put(env, osc2cl(osc));
3263         }
3264
3265         RETURN(0);
3266 }
3267 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3268
3269 static int osc_import_event(struct obd_device *obd,
3270                             struct obd_import *imp,
3271                             enum obd_import_event event)
3272 {
3273         struct client_obd *cli;
3274         int rc = 0;
3275
3276         ENTRY;
3277         LASSERT(imp->imp_obd == obd);
3278
3279         switch (event) {
3280         case IMP_EVENT_DISCON: {
3281                 cli = &obd->u.cli;
3282                 spin_lock(&cli->cl_loi_list_lock);
3283                 cli->cl_avail_grant = 0;
3284                 cli->cl_lost_grant = 0;
3285                 spin_unlock(&cli->cl_loi_list_lock);
3286                 break;
3287         }
3288         case IMP_EVENT_INACTIVE: {
3289                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3290                 break;
3291         }
3292         case IMP_EVENT_INVALIDATE: {
3293                 struct ldlm_namespace *ns = obd->obd_namespace;
3294                 struct lu_env         *env;
3295                 __u16                  refcheck;
3296
3297                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3298
3299                 env = cl_env_get(&refcheck);
3300                 if (!IS_ERR(env)) {
3301                         osc_io_unplug(env, &obd->u.cli, NULL);
3302
3303                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3304                                                  osc_ldlm_resource_invalidate,
3305                                                  env, 0);
3306                         cl_env_put(env, &refcheck);
3307
3308                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3309                 } else
3310                         rc = PTR_ERR(env);
3311                 break;
3312         }
3313         case IMP_EVENT_ACTIVE: {
3314                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3315                 break;
3316         }
3317         case IMP_EVENT_OCD: {
3318                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3319
3320                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3321                         osc_init_grant(&obd->u.cli, ocd);
3322
3323                 /* See bug 7198 */
3324                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3325                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3326
3327                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3328                 break;
3329         }
3330         case IMP_EVENT_DEACTIVATE: {
3331                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3332                 break;
3333         }
3334         case IMP_EVENT_ACTIVATE: {
3335                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3336                 break;
3337         }
3338         default:
3339                 CERROR("Unknown import event %d\n", event);
3340                 LBUG();
3341         }
3342         RETURN(rc);
3343 }
3344
3345 /**
3346  * Determine whether the lock can be canceled before replaying the lock
3347  * during recovery, see bug16774 for detailed information.
3348  *
3349  * \retval zero the lock can't be canceled
3350  * \retval other ok to cancel
3351  */
3352 static int osc_cancel_weight(struct ldlm_lock *lock)
3353 {
3354         /*
3355          * Cancel all unused and granted extent lock.
3356          */
3357         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3358             ldlm_is_granted(lock) &&
3359             osc_ldlm_weigh_ast(lock) == 0)
3360                 RETURN(1);
3361
3362         RETURN(0);
3363 }
3364
3365 static int brw_queue_work(const struct lu_env *env, void *data)
3366 {
3367         struct client_obd *cli = data;
3368
3369         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3370
3371         osc_io_unplug(env, cli, NULL);
3372         RETURN(0);
3373 }
3374
3375 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3376 {
3377         struct client_obd *cli = &obd->u.cli;
3378         void *handler;
3379         int rc;
3380
3381         ENTRY;
3382
3383         rc = ptlrpcd_addref();
3384         if (rc)
3385                 RETURN(rc);
3386
3387         rc = client_obd_setup(obd, lcfg);
3388         if (rc)
3389                 GOTO(out_ptlrpcd, rc);
3390
3391
3392         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3393         if (IS_ERR(handler))
3394                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3395         cli->cl_writeback_work = handler;
3396
3397         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3398         if (IS_ERR(handler))
3399                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3400         cli->cl_lru_work = handler;
3401
3402         rc = osc_quota_setup(obd);
3403         if (rc)
3404                 GOTO(out_ptlrpcd_work, rc);
3405
3406         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3407         osc_update_next_shrink(cli);
3408
3409         RETURN(rc);
3410
3411 out_ptlrpcd_work:
3412         if (cli->cl_writeback_work != NULL) {
3413                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3414                 cli->cl_writeback_work = NULL;
3415         }
3416         if (cli->cl_lru_work != NULL) {
3417                 ptlrpcd_destroy_work(cli->cl_lru_work);
3418                 cli->cl_lru_work = NULL;
3419         }
3420         client_obd_cleanup(obd);
3421 out_ptlrpcd:
3422         ptlrpcd_decref();
3423         RETURN(rc);
3424 }
3425 EXPORT_SYMBOL(osc_setup_common);
3426
3427 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3428 {
3429         struct client_obd *cli = &obd->u.cli;
3430         int                adding;
3431         int                added;
3432         int                req_count;
3433         int                rc;
3434
3435         ENTRY;
3436
3437         rc = osc_setup_common(obd, lcfg);
3438         if (rc < 0)
3439                 RETURN(rc);
3440
3441         rc = osc_tunables_init(obd);
3442         if (rc)
3443                 RETURN(rc);
3444
3445         /*
3446          * We try to control the total number of requests with a upper limit
3447          * osc_reqpool_maxreqcount. There might be some race which will cause
3448          * over-limit allocation, but it is fine.
3449          */
3450         req_count = atomic_read(&osc_pool_req_count);
3451         if (req_count < osc_reqpool_maxreqcount) {
3452                 adding = cli->cl_max_rpcs_in_flight + 2;
3453                 if (req_count + adding > osc_reqpool_maxreqcount)
3454                         adding = osc_reqpool_maxreqcount - req_count;
3455
3456                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3457                 atomic_add(added, &osc_pool_req_count);
3458         }
3459
3460         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3461
3462         spin_lock(&osc_shrink_lock);
3463         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3464         spin_unlock(&osc_shrink_lock);
3465         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3466         cli->cl_import->imp_idle_debug = D_HA;
3467
3468         RETURN(0);
3469 }
3470
3471 int osc_precleanup_common(struct obd_device *obd)
3472 {
3473         struct client_obd *cli = &obd->u.cli;
3474         ENTRY;
3475
3476         /* LU-464
3477          * for echo client, export may be on zombie list, wait for
3478          * zombie thread to cull it, because cli.cl_import will be
3479          * cleared in client_disconnect_export():
3480          *   class_export_destroy() -> obd_cleanup() ->
3481          *   echo_device_free() -> echo_client_cleanup() ->
3482          *   obd_disconnect() -> osc_disconnect() ->
3483          *   client_disconnect_export()
3484          */
3485         obd_zombie_barrier();
3486         if (cli->cl_writeback_work) {
3487                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3488                 cli->cl_writeback_work = NULL;
3489         }
3490
3491         if (cli->cl_lru_work) {
3492                 ptlrpcd_destroy_work(cli->cl_lru_work);
3493                 cli->cl_lru_work = NULL;
3494         }
3495
3496         obd_cleanup_client_import(obd);
3497         RETURN(0);
3498 }
3499 EXPORT_SYMBOL(osc_precleanup_common);
3500
3501 static int osc_precleanup(struct obd_device *obd)
3502 {
3503         ENTRY;
3504
3505         osc_precleanup_common(obd);
3506
3507         ptlrpc_lprocfs_unregister_obd(obd);
3508         RETURN(0);
3509 }
3510
3511 int osc_cleanup_common(struct obd_device *obd)
3512 {
3513         struct client_obd *cli = &obd->u.cli;
3514         int rc;
3515
3516         ENTRY;
3517
3518         spin_lock(&osc_shrink_lock);
3519         list_del(&cli->cl_shrink_list);
3520         spin_unlock(&osc_shrink_lock);
3521
3522         /* lru cleanup */
3523         if (cli->cl_cache != NULL) {
3524                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3525                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3526                 list_del_init(&cli->cl_lru_osc);
3527                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3528                 cli->cl_lru_left = NULL;
3529                 cl_cache_decref(cli->cl_cache);
3530                 cli->cl_cache = NULL;
3531         }
3532
3533         /* free memory of osc quota cache */
3534         osc_quota_cleanup(obd);
3535
3536         rc = client_obd_cleanup(obd);
3537
3538         ptlrpcd_decref();
3539         RETURN(rc);
3540 }
3541 EXPORT_SYMBOL(osc_cleanup_common);
3542
3543 static const struct obd_ops osc_obd_ops = {
3544         .o_owner                = THIS_MODULE,
3545         .o_setup                = osc_setup,
3546         .o_precleanup           = osc_precleanup,
3547         .o_cleanup              = osc_cleanup_common,
3548         .o_add_conn             = client_import_add_conn,
3549         .o_del_conn             = client_import_del_conn,
3550         .o_connect              = client_connect_import,
3551         .o_reconnect            = osc_reconnect,
3552         .o_disconnect           = osc_disconnect,
3553         .o_statfs               = osc_statfs,
3554         .o_statfs_async         = osc_statfs_async,
3555         .o_create               = osc_create,
3556         .o_destroy              = osc_destroy,
3557         .o_getattr              = osc_getattr,
3558         .o_setattr              = osc_setattr,
3559         .o_iocontrol            = osc_iocontrol,
3560         .o_set_info_async       = osc_set_info_async,
3561         .o_import_event         = osc_import_event,
3562         .o_quotactl             = osc_quotactl,
3563 };
3564
3565 static struct shrinker *osc_cache_shrinker;
3566 LIST_HEAD(osc_shrink_list);
3567 DEFINE_SPINLOCK(osc_shrink_lock);
3568
3569 #ifndef HAVE_SHRINKER_COUNT
3570 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3571 {
3572         struct shrink_control scv = {
3573                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3574                 .gfp_mask   = shrink_param(sc, gfp_mask)
3575         };
3576         (void)osc_cache_shrink_scan(shrinker, &scv);
3577
3578         return osc_cache_shrink_count(shrinker, &scv);
3579 }
3580 #endif
3581
3582 static int __init osc_init(void)
3583 {
3584         unsigned int reqpool_size;
3585         unsigned int reqsize;
3586         int rc;
3587         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3588                          osc_cache_shrink_count, osc_cache_shrink_scan);
3589         ENTRY;
3590
3591         /* print an address of _any_ initialized kernel symbol from this
3592          * module, to allow debugging with gdb that doesn't support data
3593          * symbols from modules.*/
3594         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3595
3596         rc = lu_kmem_init(osc_caches);
3597         if (rc)
3598                 RETURN(rc);
3599
3600         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3601                                  LUSTRE_OSC_NAME, &osc_device_type);
3602         if (rc)
3603                 GOTO(out_kmem, rc);
3604
3605         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3606
3607         /* This is obviously too much memory, only prevent overflow here */
3608         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3609                 GOTO(out_type, rc = -EINVAL);
3610
3611         reqpool_size = osc_reqpool_mem_max << 20;
3612
3613         reqsize = 1;
3614         while (reqsize < OST_IO_MAXREQSIZE)
3615                 reqsize = reqsize << 1;
3616
3617         /*
3618          * We don't enlarge the request count in OSC pool according to
3619          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3620          * tried after normal allocation failed. So a small OSC pool won't
3621          * cause much performance degression in most of cases.
3622          */
3623         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3624
3625         atomic_set(&osc_pool_req_count, 0);
3626         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3627                                           ptlrpc_add_rqs_to_pool);
3628
3629         if (osc_rq_pool == NULL)
3630                 GOTO(out_type, rc = -ENOMEM);
3631
3632         rc = osc_start_grant_work();
3633         if (rc != 0)
3634                 GOTO(out_req_pool, rc);
3635
3636         RETURN(rc);
3637
3638 out_req_pool:
3639         ptlrpc_free_rq_pool(osc_rq_pool);
3640 out_type:
3641         class_unregister_type(LUSTRE_OSC_NAME);
3642 out_kmem:
3643         lu_kmem_fini(osc_caches);
3644
3645         RETURN(rc);
3646 }
3647
3648 static void __exit osc_exit(void)
3649 {
3650         osc_stop_grant_work();
3651         remove_shrinker(osc_cache_shrinker);
3652         class_unregister_type(LUSTRE_OSC_NAME);
3653         lu_kmem_fini(osc_caches);
3654         ptlrpc_free_rq_pool(osc_rq_pool);
3655 }
3656
3657 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3658 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3659 MODULE_VERSION(LUSTRE_VERSION_STRING);
3660 MODULE_LICENSE("GPL");
3661
3662 module_init(osc_init);
3663 module_exit(osc_exit);