Whamcloud - gitweb
4bebfaa5b937a91e5588fe4272134555a476e878
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 unsigned long consumed = cli->cl_reserved_grant;
1027
1028                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1029                         consumed += cli->cl_dirty_grant;
1030                 else
1031                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1032                 if (cli->cl_avail_grant < consumed) {
1033                         CERROR("%s: granted %ld but already consumed %ld\n",
1034                                cli_name(cli), cli->cl_avail_grant, consumed);
1035                         cli->cl_avail_grant = 0;
1036                 } else {
1037                         cli->cl_avail_grant -= consumed;
1038                 }
1039         }
1040
1041         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1042                 u64 size;
1043                 int chunk_mask;
1044
1045                 /* overhead for each extent insertion */
1046                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1047                 /* determine the appropriate chunk size used by osc_extent. */
1048                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1049                                           ocd->ocd_grant_blkbits);
1050                 /* max_pages_per_rpc must be chunk aligned */
1051                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1052                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1053                                              ~chunk_mask) & chunk_mask;
1054                 /* determine maximum extent size, in #pages */
1055                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1056                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1057                 if (cli->cl_max_extent_pages == 0)
1058                         cli->cl_max_extent_pages = 1;
1059         } else {
1060                 cli->cl_grant_extent_tax = 0;
1061                 cli->cl_chunkbits = PAGE_SHIFT;
1062                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1063         }
1064         spin_unlock(&cli->cl_loi_list_lock);
1065
1066         CDEBUG(D_CACHE,
1067                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1068                cli_name(cli),
1069                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1070                cli->cl_max_extent_pages);
1071
1072         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1073                 osc_add_grant_list(cli);
1074 }
1075 EXPORT_SYMBOL(osc_init_grant);
1076
1077 /* We assume that the reason this OSC got a short read is because it read
1078  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1079  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1080  * this stripe never got written at or beyond this stripe offset yet. */
1081 static void handle_short_read(int nob_read, size_t page_count,
1082                               struct brw_page **pga)
1083 {
1084         char *ptr;
1085         int i = 0;
1086
1087         /* skip bytes read OK */
1088         while (nob_read > 0) {
1089                 LASSERT (page_count > 0);
1090
1091                 if (pga[i]->count > nob_read) {
1092                         /* EOF inside this page */
1093                         ptr = kmap(pga[i]->pg) +
1094                                 (pga[i]->off & ~PAGE_MASK);
1095                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1096                         kunmap(pga[i]->pg);
1097                         page_count--;
1098                         i++;
1099                         break;
1100                 }
1101
1102                 nob_read -= pga[i]->count;
1103                 page_count--;
1104                 i++;
1105         }
1106
1107         /* zero remaining pages */
1108         while (page_count-- > 0) {
1109                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1110                 memset(ptr, 0, pga[i]->count);
1111                 kunmap(pga[i]->pg);
1112                 i++;
1113         }
1114 }
1115
1116 static int check_write_rcs(struct ptlrpc_request *req,
1117                            int requested_nob, int niocount,
1118                            size_t page_count, struct brw_page **pga)
1119 {
1120         int     i;
1121         __u32   *remote_rcs;
1122
1123         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1124                                                   sizeof(*remote_rcs) *
1125                                                   niocount);
1126         if (remote_rcs == NULL) {
1127                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1128                 return(-EPROTO);
1129         }
1130
1131         /* return error if any niobuf was in error */
1132         for (i = 0; i < niocount; i++) {
1133                 if ((int)remote_rcs[i] < 0) {
1134                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1135                                i, remote_rcs[i], req);
1136                         return remote_rcs[i];
1137                 }
1138
1139                 if (remote_rcs[i] != 0) {
1140                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1141                                 i, remote_rcs[i], req);
1142                         return(-EPROTO);
1143                 }
1144         }
1145         if (req->rq_bulk != NULL &&
1146             req->rq_bulk->bd_nob_transferred != requested_nob) {
1147                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1148                        req->rq_bulk->bd_nob_transferred, requested_nob);
1149                 return(-EPROTO);
1150         }
1151
1152         return (0);
1153 }
1154
1155 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1156 {
1157         if (p1->flag != p2->flag) {
1158                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1159                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1160                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1161
1162                 /* warn if we try to combine flags that we don't know to be
1163                  * safe to combine */
1164                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1165                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1166                               "report this at https://jira.whamcloud.com/\n",
1167                               p1->flag, p2->flag);
1168                 }
1169                 return 0;
1170         }
1171
1172         return (p1->off + p1->count == p2->off);
1173 }
1174
1175 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1176 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1177                                    size_t pg_count, struct brw_page **pga,
1178                                    int opc, obd_dif_csum_fn *fn,
1179                                    int sector_size,
1180                                    u32 *check_sum)
1181 {
1182         struct ahash_request *req;
1183         /* Used Adler as the default checksum type on top of DIF tags */
1184         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1185         struct page *__page;
1186         unsigned char *buffer;
1187         __u16 *guard_start;
1188         unsigned int bufsize;
1189         int guard_number;
1190         int used_number = 0;
1191         int used;
1192         u32 cksum;
1193         int rc = 0;
1194         int i = 0;
1195
1196         LASSERT(pg_count > 0);
1197
1198         __page = alloc_page(GFP_KERNEL);
1199         if (__page == NULL)
1200                 return -ENOMEM;
1201
1202         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1203         if (IS_ERR(req)) {
1204                 rc = PTR_ERR(req);
1205                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1206                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1207                 GOTO(out, rc);
1208         }
1209
1210         buffer = kmap(__page);
1211         guard_start = (__u16 *)buffer;
1212         guard_number = PAGE_SIZE / sizeof(*guard_start);
1213         while (nob > 0 && pg_count > 0) {
1214                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216                 /* corrupt the data before we compute the checksum, to
1217                  * simulate an OST->client data error */
1218                 if (unlikely(i == 0 && opc == OST_READ &&
1219                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1220                         unsigned char *ptr = kmap(pga[i]->pg);
1221                         int off = pga[i]->off & ~PAGE_MASK;
1222
1223                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1224                         kunmap(pga[i]->pg);
1225                 }
1226
1227                 /*
1228                  * The left guard number should be able to hold checksums of a
1229                  * whole page
1230                  */
1231                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1232                                                   pga[i]->off & ~PAGE_MASK,
1233                                                   count,
1234                                                   guard_start + used_number,
1235                                                   guard_number - used_number,
1236                                                   &used, sector_size,
1237                                                   fn);
1238                 if (rc)
1239                         break;
1240
1241                 used_number += used;
1242                 if (used_number == guard_number) {
1243                         cfs_crypto_hash_update_page(req, __page, 0,
1244                                 used_number * sizeof(*guard_start));
1245                         used_number = 0;
1246                 }
1247
1248                 nob -= pga[i]->count;
1249                 pg_count--;
1250                 i++;
1251         }
1252         kunmap(__page);
1253         if (rc)
1254                 GOTO(out, rc);
1255
1256         if (used_number != 0)
1257                 cfs_crypto_hash_update_page(req, __page, 0,
1258                         used_number * sizeof(*guard_start));
1259
1260         bufsize = sizeof(cksum);
1261         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1262
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 cksum++;
1267
1268         *check_sum = cksum;
1269 out:
1270         __free_page(__page);
1271         return rc;
1272 }
1273 #else /* !CONFIG_CRC_T10DIF */
1274 #define obd_dif_ip_fn NULL
1275 #define obd_dif_crc_fn NULL
1276 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1277         -EOPNOTSUPP
1278 #endif /* CONFIG_CRC_T10DIF */
1279
1280 static int osc_checksum_bulk(int nob, size_t pg_count,
1281                              struct brw_page **pga, int opc,
1282                              enum cksum_types cksum_type,
1283                              u32 *cksum)
1284 {
1285         int                             i = 0;
1286         struct ahash_request           *req;
1287         unsigned int                    bufsize;
1288         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1289
1290         LASSERT(pg_count > 0);
1291
1292         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1293         if (IS_ERR(req)) {
1294                 CERROR("Unable to initialize checksum hash %s\n",
1295                        cfs_crypto_hash_name(cfs_alg));
1296                 return PTR_ERR(req);
1297         }
1298
1299         while (nob > 0 && pg_count > 0) {
1300                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1301
1302                 /* corrupt the data before we compute the checksum, to
1303                  * simulate an OST->client data error */
1304                 if (i == 0 && opc == OST_READ &&
1305                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1306                         unsigned char *ptr = kmap(pga[i]->pg);
1307                         int off = pga[i]->off & ~PAGE_MASK;
1308
1309                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1310                         kunmap(pga[i]->pg);
1311                 }
1312                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1313                                             pga[i]->off & ~PAGE_MASK,
1314                                             count);
1315                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1316                                (int)(pga[i]->off & ~PAGE_MASK));
1317
1318                 nob -= pga[i]->count;
1319                 pg_count--;
1320                 i++;
1321         }
1322
1323         bufsize = sizeof(*cksum);
1324         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1325
1326         /* For sending we only compute the wrong checksum instead
1327          * of corrupting the data so it is still correct on a redo */
1328         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1329                 (*cksum)++;
1330
1331         return 0;
1332 }
1333
1334 static int osc_checksum_bulk_rw(const char *obd_name,
1335                                 enum cksum_types cksum_type,
1336                                 int nob, size_t pg_count,
1337                                 struct brw_page **pga, int opc,
1338                                 u32 *check_sum)
1339 {
1340         obd_dif_csum_fn *fn = NULL;
1341         int sector_size = 0;
1342         int rc;
1343
1344         ENTRY;
1345         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1346
1347         if (fn)
1348                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1349                                              opc, fn, sector_size, check_sum);
1350         else
1351                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1352                                        check_sum);
1353
1354         RETURN(rc);
1355 }
1356
1357 static inline void osc_release_bounce_pages(struct brw_page **pga,
1358                                             u32 page_count)
1359 {
1360 #ifdef HAVE_LUSTRE_CRYPTO
1361         int i;
1362
1363         for (i = 0; i < page_count; i++) {
1364                 if (!pga[i]->pg->mapping)
1365                         /* bounce pages are unmapped */
1366                         llcrypt_finalize_bounce_page(&pga[i]->pg);
1367                 pga[i]->count -= pga[i]->bp_count_diff;
1368                 pga[i]->off += pga[i]->bp_off_diff;
1369         }
1370 #endif
1371 }
1372
1373 static int
1374 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1375                      u32 page_count, struct brw_page **pga,
1376                      struct ptlrpc_request **reqp, int resend)
1377 {
1378         struct ptlrpc_request *req;
1379         struct ptlrpc_bulk_desc *desc;
1380         struct ost_body *body;
1381         struct obd_ioobj *ioobj;
1382         struct niobuf_remote *niobuf;
1383         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1384         struct osc_brw_async_args *aa;
1385         struct req_capsule *pill;
1386         struct brw_page *pg_prev;
1387         void *short_io_buf;
1388         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1389         struct inode *inode;
1390
1391         ENTRY;
1392         inode = page2inode(pga[0]->pg);
1393         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1394                 RETURN(-ENOMEM); /* Recoverable */
1395         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1396                 RETURN(-EINVAL); /* Fatal */
1397
1398         if ((cmd & OBD_BRW_WRITE) != 0) {
1399                 opc = OST_WRITE;
1400                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1401                                                 osc_rq_pool,
1402                                                 &RQF_OST_BRW_WRITE);
1403         } else {
1404                 opc = OST_READ;
1405                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1406         }
1407         if (req == NULL)
1408                 RETURN(-ENOMEM);
1409
1410         if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1411                 for (i = 0; i < page_count; i++) {
1412                         struct brw_page *pg = pga[i];
1413                         struct page *data_page = NULL;
1414                         bool retried = false;
1415                         bool lockedbymyself;
1416                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1417
1418 retry_encrypt:
1419                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1420                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1421                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1422                         /* The page can already be locked when we arrive here.
1423                          * This is possible when cl_page_assume/vvp_page_assume
1424                          * is stuck on wait_on_page_writeback with page lock
1425                          * held. In this case there is no risk for the lock to
1426                          * be released while we are doing our encryption
1427                          * processing, because writeback against that page will
1428                          * end in vvp_page_completion_write/cl_page_completion,
1429                          * which means only once the page is fully processed.
1430                          */
1431                         lockedbymyself = trylock_page(pg->pg);
1432                         data_page =
1433                                 llcrypt_encrypt_pagecache_blocks(pg->pg,
1434                                                                  nunits, 0,
1435                                                                  GFP_NOFS);
1436                         if (lockedbymyself)
1437                                 unlock_page(pg->pg);
1438                         if (IS_ERR(data_page)) {
1439                                 rc = PTR_ERR(data_page);
1440                                 if (rc == -ENOMEM && !retried) {
1441                                         retried = true;
1442                                         rc = 0;
1443                                         goto retry_encrypt;
1444                                 }
1445                                 ptlrpc_request_free(req);
1446                                 RETURN(rc);
1447                         }
1448                         pg->pg = data_page;
1449                         /* there should be no gap in the middle of page array */
1450                         if (i == page_count - 1) {
1451                                 struct osc_async_page *oap = brw_page2oap(pg);
1452
1453                                 oa->o_size = oap->oap_count +
1454                                         oap->oap_obj_off + oap->oap_page_off;
1455                         }
1456                         /* len is forced to nunits, and relative offset to 0
1457                          * so store the old, clear text info
1458                          */
1459                         pg->bp_count_diff = nunits - pg->count;
1460                         pg->count = nunits;
1461                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1462                         pg->off = pg->off & PAGE_MASK;
1463                 }
1464         } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1465                 for (i = 0; i < page_count; i++) {
1466                         struct brw_page *pg = pga[i];
1467                         u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1468
1469                         if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1470                                 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1471                                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1472                         /* count/off are forced to cover the whole encryption
1473                          * unit size so that all encrypted data is stored on the
1474                          * OST, so adjust bp_{count,off}_diff for the size of
1475                          * the clear text.
1476                          */
1477                         pg->bp_count_diff = nunits - pg->count;
1478                         pg->count = nunits;
1479                         pg->bp_off_diff = pg->off & ~PAGE_MASK;
1480                         pg->off = pg->off & PAGE_MASK;
1481                 }
1482         }
1483
1484         for (niocount = i = 1; i < page_count; i++) {
1485                 if (!can_merge_pages(pga[i - 1], pga[i]))
1486                         niocount++;
1487         }
1488
1489         pill = &req->rq_pill;
1490         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1491                              sizeof(*ioobj));
1492         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1493                              niocount * sizeof(*niobuf));
1494
1495         for (i = 0; i < page_count; i++) {
1496                 short_io_size += pga[i]->count;
1497                 if (!inode || !IS_ENCRYPTED(inode)) {
1498                         pga[i]->bp_count_diff = 0;
1499                         pga[i]->bp_off_diff = 0;
1500                 }
1501         }
1502
1503         /* Check if read/write is small enough to be a short io. */
1504         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1505             !imp_connect_shortio(cli->cl_import))
1506                 short_io_size = 0;
1507
1508         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1509                              opc == OST_READ ? 0 : short_io_size);
1510         if (opc == OST_READ)
1511                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1512                                      short_io_size);
1513
1514         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1515         if (rc) {
1516                 ptlrpc_request_free(req);
1517                 RETURN(rc);
1518         }
1519         osc_set_io_portal(req);
1520
1521         ptlrpc_at_set_req_timeout(req);
1522         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1523          * retry logic */
1524         req->rq_no_retry_einprogress = 1;
1525
1526         if (short_io_size != 0) {
1527                 desc = NULL;
1528                 short_io_buf = NULL;
1529                 goto no_bulk;
1530         }
1531
1532         desc = ptlrpc_prep_bulk_imp(req, page_count,
1533                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1534                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1535                         PTLRPC_BULK_PUT_SINK),
1536                 OST_BULK_PORTAL,
1537                 &ptlrpc_bulk_kiov_pin_ops);
1538
1539         if (desc == NULL)
1540                 GOTO(out, rc = -ENOMEM);
1541         /* NB request now owns desc and will free it when it gets freed */
1542 no_bulk:
1543         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1544         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1545         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1546         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1547
1548         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1549
1550         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1551          * and from_kgid(), because they are asynchronous. Fortunately, variable
1552          * oa contains valid o_uid and o_gid in these two operations.
1553          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1554          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1555          * other process logic */
1556         body->oa.o_uid = oa->o_uid;
1557         body->oa.o_gid = oa->o_gid;
1558
1559         obdo_to_ioobj(oa, ioobj);
1560         ioobj->ioo_bufcnt = niocount;
1561         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1562          * that might be send for this request.  The actual number is decided
1563          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1564          * "max - 1" for old client compatibility sending "0", and also so the
1565          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1566         if (desc != NULL)
1567                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1568         else /* short io */
1569                 ioobj_max_brw_set(ioobj, 0);
1570
1571         if (short_io_size != 0) {
1572                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1573                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1574                         body->oa.o_flags = 0;
1575                 }
1576                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1577                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1578                        short_io_size);
1579                 if (opc == OST_WRITE) {
1580                         short_io_buf = req_capsule_client_get(pill,
1581                                                               &RMF_SHORT_IO);
1582                         LASSERT(short_io_buf != NULL);
1583                 }
1584         }
1585
1586         LASSERT(page_count > 0);
1587         pg_prev = pga[0];
1588         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1589                 struct brw_page *pg = pga[i];
1590                 int poff = pg->off & ~PAGE_MASK;
1591
1592                 LASSERT(pg->count > 0);
1593                 /* make sure there is no gap in the middle of page array */
1594                 LASSERTF(page_count == 1 ||
1595                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1596                           ergo(i > 0 && i < page_count - 1,
1597                                poff == 0 && pg->count == PAGE_SIZE)   &&
1598                           ergo(i == page_count - 1, poff == 0)),
1599                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1600                          i, page_count, pg, pg->off, pg->count);
1601                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1602                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1603                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1604                          i, page_count,
1605                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1606                          pg_prev->pg, page_private(pg_prev->pg),
1607                          pg_prev->pg->index, pg_prev->off);
1608                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1609                         (pg->flag & OBD_BRW_SRVLOCK));
1610                 if (short_io_size != 0 && opc == OST_WRITE) {
1611                         unsigned char *ptr = kmap_atomic(pg->pg);
1612
1613                         LASSERT(short_io_size >= requested_nob + pg->count);
1614                         memcpy(short_io_buf + requested_nob,
1615                                ptr + poff,
1616                                pg->count);
1617                         kunmap_atomic(ptr);
1618                 } else if (short_io_size == 0) {
1619                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1620                                                          pg->count);
1621                 }
1622                 requested_nob += pg->count;
1623
1624                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1625                         niobuf--;
1626                         niobuf->rnb_len += pg->count;
1627                 } else {
1628                         niobuf->rnb_offset = pg->off;
1629                         niobuf->rnb_len    = pg->count;
1630                         niobuf->rnb_flags  = pg->flag;
1631                 }
1632                 pg_prev = pg;
1633         }
1634
1635         LASSERTF((void *)(niobuf - niocount) ==
1636                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1637                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1638                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1639
1640         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1641         if (resend) {
1642                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1643                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1644                         body->oa.o_flags = 0;
1645                 }
1646                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1647         }
1648
1649         if (osc_should_shrink_grant(cli))
1650                 osc_shrink_grant_local(cli, &body->oa);
1651
1652         /* size[REQ_REC_OFF] still sizeof (*body) */
1653         if (opc == OST_WRITE) {
1654                 if (cli->cl_checksum &&
1655                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1656                         /* store cl_cksum_type in a local variable since
1657                          * it can be changed via lprocfs */
1658                         enum cksum_types cksum_type = cli->cl_cksum_type;
1659
1660                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1661                                 body->oa.o_flags = 0;
1662
1663                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1664                                                                 cksum_type);
1665                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1666
1667                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1668                                                   requested_nob, page_count,
1669                                                   pga, OST_WRITE,
1670                                                   &body->oa.o_cksum);
1671                         if (rc < 0) {
1672                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1673                                        rc);
1674                                 GOTO(out, rc);
1675                         }
1676                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1677                                body->oa.o_cksum);
1678
1679                         /* save this in 'oa', too, for later checking */
1680                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1681                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1682                                                            cksum_type);
1683                 } else {
1684                         /* clear out the checksum flag, in case this is a
1685                          * resend but cl_checksum is no longer set. b=11238 */
1686                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1687                 }
1688                 oa->o_cksum = body->oa.o_cksum;
1689                 /* 1 RC per niobuf */
1690                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1691                                      sizeof(__u32) * niocount);
1692         } else {
1693                 if (cli->cl_checksum &&
1694                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1695                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1696                                 body->oa.o_flags = 0;
1697                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1698                                 cli->cl_cksum_type);
1699                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1700                 }
1701
1702                 /* Client cksum has been already copied to wire obdo in previous
1703                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1704                  * resent due to cksum error, this will allow Server to
1705                  * check+dump pages on its side */
1706         }
1707         ptlrpc_request_set_replen(req);
1708
1709         aa = ptlrpc_req_async_args(aa, req);
1710         aa->aa_oa = oa;
1711         aa->aa_requested_nob = requested_nob;
1712         aa->aa_nio_count = niocount;
1713         aa->aa_page_count = page_count;
1714         aa->aa_resends = 0;
1715         aa->aa_ppga = pga;
1716         aa->aa_cli = cli;
1717         INIT_LIST_HEAD(&aa->aa_oaps);
1718
1719         *reqp = req;
1720         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1721         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1722                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1723                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1724         RETURN(0);
1725
1726  out:
1727         ptlrpc_req_finished(req);
1728         RETURN(rc);
1729 }
1730
1731 char dbgcksum_file_name[PATH_MAX];
1732
1733 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1734                                 struct brw_page **pga, __u32 server_cksum,
1735                                 __u32 client_cksum)
1736 {
1737         struct file *filp;
1738         int rc, i;
1739         unsigned int len;
1740         char *buf;
1741
1742         /* will only keep dump of pages on first error for the same range in
1743          * file/fid, not during the resends/retries. */
1744         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1745                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1746                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1747                   libcfs_debug_file_path_arr :
1748                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1749                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1750                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1751                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1752                  pga[0]->off,
1753                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1754                  client_cksum, server_cksum);
1755         filp = filp_open(dbgcksum_file_name,
1756                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1757         if (IS_ERR(filp)) {
1758                 rc = PTR_ERR(filp);
1759                 if (rc == -EEXIST)
1760                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1761                                "checksum error: rc = %d\n", dbgcksum_file_name,
1762                                rc);
1763                 else
1764                         CERROR("%s: can't open to dump pages with checksum "
1765                                "error: rc = %d\n", dbgcksum_file_name, rc);
1766                 return;
1767         }
1768
1769         for (i = 0; i < page_count; i++) {
1770                 len = pga[i]->count;
1771                 buf = kmap(pga[i]->pg);
1772                 while (len != 0) {
1773                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1774                         if (rc < 0) {
1775                                 CERROR("%s: wanted to write %u but got %d "
1776                                        "error\n", dbgcksum_file_name, len, rc);
1777                                 break;
1778                         }
1779                         len -= rc;
1780                         buf += rc;
1781                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1782                                dbgcksum_file_name, rc);
1783                 }
1784                 kunmap(pga[i]->pg);
1785         }
1786
1787         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1788         if (rc)
1789                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1790         filp_close(filp, NULL);
1791 }
1792
1793 static int
1794 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1795                      __u32 client_cksum, __u32 server_cksum,
1796                      struct osc_brw_async_args *aa)
1797 {
1798         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1799         enum cksum_types cksum_type;
1800         obd_dif_csum_fn *fn = NULL;
1801         int sector_size = 0;
1802         __u32 new_cksum;
1803         char *msg;
1804         int rc;
1805
1806         if (server_cksum == client_cksum) {
1807                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1808                 return 0;
1809         }
1810
1811         if (aa->aa_cli->cl_checksum_dump)
1812                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1813                                     server_cksum, client_cksum);
1814
1815         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1816                                            oa->o_flags : 0);
1817
1818         switch (cksum_type) {
1819         case OBD_CKSUM_T10IP512:
1820                 fn = obd_dif_ip_fn;
1821                 sector_size = 512;
1822                 break;
1823         case OBD_CKSUM_T10IP4K:
1824                 fn = obd_dif_ip_fn;
1825                 sector_size = 4096;
1826                 break;
1827         case OBD_CKSUM_T10CRC512:
1828                 fn = obd_dif_crc_fn;
1829                 sector_size = 512;
1830                 break;
1831         case OBD_CKSUM_T10CRC4K:
1832                 fn = obd_dif_crc_fn;
1833                 sector_size = 4096;
1834                 break;
1835         default:
1836                 break;
1837         }
1838
1839         if (fn)
1840                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1841                                              aa->aa_page_count, aa->aa_ppga,
1842                                              OST_WRITE, fn, sector_size,
1843                                              &new_cksum);
1844         else
1845                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1846                                        aa->aa_ppga, OST_WRITE, cksum_type,
1847                                        &new_cksum);
1848
1849         if (rc < 0)
1850                 msg = "failed to calculate the client write checksum";
1851         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1852                 msg = "the server did not use the checksum type specified in "
1853                       "the original request - likely a protocol problem";
1854         else if (new_cksum == server_cksum)
1855                 msg = "changed on the client after we checksummed it - "
1856                       "likely false positive due to mmap IO (bug 11742)";
1857         else if (new_cksum == client_cksum)
1858                 msg = "changed in transit before arrival at OST";
1859         else
1860                 msg = "changed in transit AND doesn't match the original - "
1861                       "likely false positive due to mmap IO (bug 11742)";
1862
1863         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1864                            DFID " object "DOSTID" extent [%llu-%llu], original "
1865                            "client csum %x (type %x), server csum %x (type %x),"
1866                            " client csum now %x\n",
1867                            obd_name, msg, libcfs_nid2str(peer->nid),
1868                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1869                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1870                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1871                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1872                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1873                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1874                            client_cksum,
1875                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1876                            server_cksum, cksum_type, new_cksum);
1877         return 1;
1878 }
1879
1880 /* Note rc enters this function as number of bytes transferred */
1881 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1882 {
1883         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1884         struct client_obd *cli = aa->aa_cli;
1885         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1886         const struct lnet_process_id *peer =
1887                 &req->rq_import->imp_connection->c_peer;
1888         struct ost_body *body;
1889         u32 client_cksum = 0;
1890         struct inode *inode;
1891
1892         ENTRY;
1893
1894         if (rc < 0 && rc != -EDQUOT) {
1895                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1896                 RETURN(rc);
1897         }
1898
1899         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1900         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1901         if (body == NULL) {
1902                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1903                 RETURN(-EPROTO);
1904         }
1905
1906         /* set/clear over quota flag for a uid/gid/projid */
1907         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1908             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1909                 unsigned qid[LL_MAXQUOTAS] = {
1910                                          body->oa.o_uid, body->oa.o_gid,
1911                                          body->oa.o_projid };
1912                 CDEBUG(D_QUOTA,
1913                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1914                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1915                        body->oa.o_valid, body->oa.o_flags);
1916                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1917                                        body->oa.o_flags);
1918         }
1919
1920         osc_update_grant(cli, body);
1921
1922         if (rc < 0)
1923                 RETURN(rc);
1924
1925         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1926                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1927
1928         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1929                 if (rc > 0) {
1930                         CERROR("%s: unexpected positive size %d\n",
1931                                obd_name, rc);
1932                         RETURN(-EPROTO);
1933                 }
1934
1935                 if (req->rq_bulk != NULL &&
1936                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1937                         RETURN(-EAGAIN);
1938
1939                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1940                     check_write_checksum(&body->oa, peer, client_cksum,
1941                                          body->oa.o_cksum, aa))
1942                         RETURN(-EAGAIN);
1943
1944                 rc = check_write_rcs(req, aa->aa_requested_nob,
1945                                      aa->aa_nio_count, aa->aa_page_count,
1946                                      aa->aa_ppga);
1947                 GOTO(out, rc);
1948         }
1949
1950         /* The rest of this function executes only for OST_READs */
1951
1952         if (req->rq_bulk == NULL) {
1953                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1954                                           RCL_SERVER);
1955                 LASSERT(rc == req->rq_status);
1956         } else {
1957                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1958                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1959         }
1960         if (rc < 0)
1961                 GOTO(out, rc = -EAGAIN);
1962
1963         if (rc > aa->aa_requested_nob) {
1964                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1965                        rc, aa->aa_requested_nob);
1966                 RETURN(-EPROTO);
1967         }
1968
1969         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1970                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1971                        rc, req->rq_bulk->bd_nob_transferred);
1972                 RETURN(-EPROTO);
1973         }
1974
1975         if (req->rq_bulk == NULL) {
1976                 /* short io */
1977                 int nob, pg_count, i = 0;
1978                 unsigned char *buf;
1979
1980                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1981                 pg_count = aa->aa_page_count;
1982                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1983                                                    rc);
1984                 nob = rc;
1985                 while (nob > 0 && pg_count > 0) {
1986                         unsigned char *ptr;
1987                         int count = aa->aa_ppga[i]->count > nob ?
1988                                     nob : aa->aa_ppga[i]->count;
1989
1990                         CDEBUG(D_CACHE, "page %p count %d\n",
1991                                aa->aa_ppga[i]->pg, count);
1992                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1993                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1994                                count);
1995                         kunmap_atomic((void *) ptr);
1996
1997                         buf += count;
1998                         nob -= count;
1999                         i++;
2000                         pg_count--;
2001                 }
2002         }
2003
2004         if (rc < aa->aa_requested_nob)
2005                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2006
2007         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2008                 static int cksum_counter;
2009                 u32        server_cksum = body->oa.o_cksum;
2010                 char      *via = "";
2011                 char      *router = "";
2012                 enum cksum_types cksum_type;
2013                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2014                         body->oa.o_flags : 0;
2015
2016                 cksum_type = obd_cksum_type_unpack(o_flags);
2017                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2018                                           aa->aa_page_count, aa->aa_ppga,
2019                                           OST_READ, &client_cksum);
2020                 if (rc < 0)
2021                         GOTO(out, rc);
2022
2023                 if (req->rq_bulk != NULL &&
2024                     peer->nid != req->rq_bulk->bd_sender) {
2025                         via = " via ";
2026                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
2027                 }
2028
2029                 if (server_cksum != client_cksum) {
2030                         struct ost_body *clbody;
2031                         u32 page_count = aa->aa_page_count;
2032
2033                         clbody = req_capsule_client_get(&req->rq_pill,
2034                                                         &RMF_OST_BODY);
2035                         if (cli->cl_checksum_dump)
2036                                 dump_all_bulk_pages(&clbody->oa, page_count,
2037                                                     aa->aa_ppga, server_cksum,
2038                                                     client_cksum);
2039
2040                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2041                                            "%s%s%s inode "DFID" object "DOSTID
2042                                            " extent [%llu-%llu], client %x, "
2043                                            "server %x, cksum_type %x\n",
2044                                            obd_name,
2045                                            libcfs_nid2str(peer->nid),
2046                                            via, router,
2047                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2048                                                 clbody->oa.o_parent_seq : 0ULL,
2049                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2050                                                 clbody->oa.o_parent_oid : 0,
2051                                            clbody->oa.o_valid & OBD_MD_FLFID ?
2052                                                 clbody->oa.o_parent_ver : 0,
2053                                            POSTID(&body->oa.o_oi),
2054                                            aa->aa_ppga[0]->off,
2055                                            aa->aa_ppga[page_count-1]->off +
2056                                            aa->aa_ppga[page_count-1]->count - 1,
2057                                            client_cksum, server_cksum,
2058                                            cksum_type);
2059                         cksum_counter = 0;
2060                         aa->aa_oa->o_cksum = client_cksum;
2061                         rc = -EAGAIN;
2062                 } else {
2063                         cksum_counter++;
2064                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2065                         rc = 0;
2066                 }
2067         } else if (unlikely(client_cksum)) {
2068                 static int cksum_missed;
2069
2070                 cksum_missed++;
2071                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2072                         CERROR("%s: checksum %u requested from %s but not sent\n",
2073                                obd_name, cksum_missed,
2074                                libcfs_nid2str(peer->nid));
2075         } else {
2076                 rc = 0;
2077         }
2078
2079         inode = page2inode(aa->aa_ppga[0]->pg);
2080         if (inode && IS_ENCRYPTED(inode)) {
2081                 int idx;
2082
2083                 if (!llcrypt_has_encryption_key(inode)) {
2084                         CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2085                         GOTO(out, rc);
2086                 }
2087                 for (idx = 0; idx < aa->aa_page_count; idx++) {
2088                         struct brw_page *pg = aa->aa_ppga[idx];
2089                         unsigned int offs = 0;
2090
2091                         while (offs < PAGE_SIZE) {
2092                                 /* do not decrypt if page is all 0s */
2093                                 if (memchr_inv(page_address(pg->pg) + offs, 0,
2094                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2095                                         /* if page is empty forward info to
2096                                          * upper layers (ll_io_zero_page) by
2097                                          * clearing PagePrivate2
2098                                          */
2099                                         if (!offs)
2100                                                 ClearPagePrivate2(pg->pg);
2101                                         break;
2102                                 }
2103
2104                                 /* The page is already locked when we arrive here,
2105                                  * except when we deal with a twisted page for
2106                                  * specific Direct IO support, in which case
2107                                  * PageChecked flag is set on page.
2108                                  */
2109                                 if (PageChecked(pg->pg))
2110                                         lock_page(pg->pg);
2111                                 rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2112                                                     LUSTRE_ENCRYPTION_UNIT_SIZE,
2113                                                                       offs);
2114                                 if (PageChecked(pg->pg))
2115                                         unlock_page(pg->pg);
2116                                 if (rc)
2117                                         GOTO(out, rc);
2118
2119                                 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2120                         }
2121                 }
2122         }
2123
2124 out:
2125         if (rc >= 0)
2126                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2127                                      aa->aa_oa, &body->oa);
2128
2129         RETURN(rc);
2130 }
2131
2132 static int osc_brw_redo_request(struct ptlrpc_request *request,
2133                                 struct osc_brw_async_args *aa, int rc)
2134 {
2135         struct ptlrpc_request *new_req;
2136         struct osc_brw_async_args *new_aa;
2137         struct osc_async_page *oap;
2138         ENTRY;
2139
2140         /* The below message is checked in replay-ost-single.sh test_8ae*/
2141         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2142                   "redo for recoverable error %d", rc);
2143
2144         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2145                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2146                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2147                                   aa->aa_ppga, &new_req, 1);
2148         if (rc)
2149                 RETURN(rc);
2150
2151         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2152                 if (oap->oap_request != NULL) {
2153                         LASSERTF(request == oap->oap_request,
2154                                  "request %p != oap_request %p\n",
2155                                  request, oap->oap_request);
2156                 }
2157         }
2158         /*
2159          * New request takes over pga and oaps from old request.
2160          * Note that copying a list_head doesn't work, need to move it...
2161          */
2162         aa->aa_resends++;
2163         new_req->rq_interpret_reply = request->rq_interpret_reply;
2164         new_req->rq_async_args = request->rq_async_args;
2165         new_req->rq_commit_cb = request->rq_commit_cb;
2166         /* cap resend delay to the current request timeout, this is similar to
2167          * what ptlrpc does (see after_reply()) */
2168         if (aa->aa_resends > new_req->rq_timeout)
2169                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2170         else
2171                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2172         new_req->rq_generation_set = 1;
2173         new_req->rq_import_generation = request->rq_import_generation;
2174
2175         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2176
2177         INIT_LIST_HEAD(&new_aa->aa_oaps);
2178         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2179         INIT_LIST_HEAD(&new_aa->aa_exts);
2180         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2181         new_aa->aa_resends = aa->aa_resends;
2182
2183         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2184                 if (oap->oap_request) {
2185                         ptlrpc_req_finished(oap->oap_request);
2186                         oap->oap_request = ptlrpc_request_addref(new_req);
2187                 }
2188         }
2189
2190         /* XXX: This code will run into problem if we're going to support
2191          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2192          * and wait for all of them to be finished. We should inherit request
2193          * set from old request. */
2194         ptlrpcd_add_req(new_req);
2195
2196         DEBUG_REQ(D_INFO, new_req, "new request");
2197         RETURN(0);
2198 }
2199
2200 /*
2201  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2202  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2203  * fine for our small page arrays and doesn't require allocation.  its an
2204  * insertion sort that swaps elements that are strides apart, shrinking the
2205  * stride down until its '1' and the array is sorted.
2206  */
2207 static void sort_brw_pages(struct brw_page **array, int num)
2208 {
2209         int stride, i, j;
2210         struct brw_page *tmp;
2211
2212         if (num == 1)
2213                 return;
2214         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2215                 ;
2216
2217         do {
2218                 stride /= 3;
2219                 for (i = stride ; i < num ; i++) {
2220                         tmp = array[i];
2221                         j = i;
2222                         while (j >= stride && array[j - stride]->off > tmp->off) {
2223                                 array[j] = array[j - stride];
2224                                 j -= stride;
2225                         }
2226                         array[j] = tmp;
2227                 }
2228         } while (stride > 1);
2229 }
2230
2231 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2232 {
2233         LASSERT(ppga != NULL);
2234         OBD_FREE_PTR_ARRAY(ppga, count);
2235 }
2236
2237 static int brw_interpret(const struct lu_env *env,
2238                          struct ptlrpc_request *req, void *args, int rc)
2239 {
2240         struct osc_brw_async_args *aa = args;
2241         struct osc_extent *ext;
2242         struct osc_extent *tmp;
2243         struct client_obd *cli = aa->aa_cli;
2244         unsigned long transferred = 0;
2245
2246         ENTRY;
2247
2248         rc = osc_brw_fini_request(req, rc);
2249         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2250
2251         /* restore clear text pages */
2252         osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2253
2254         /*
2255          * When server returns -EINPROGRESS, client should always retry
2256          * regardless of the number of times the bulk was resent already.
2257          */
2258         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2259                 if (req->rq_import_generation !=
2260                     req->rq_import->imp_generation) {
2261                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2262                                ""DOSTID", rc = %d.\n",
2263                                req->rq_import->imp_obd->obd_name,
2264                                POSTID(&aa->aa_oa->o_oi), rc);
2265                 } else if (rc == -EINPROGRESS ||
2266                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2267                         rc = osc_brw_redo_request(req, aa, rc);
2268                 } else {
2269                         CERROR("%s: too many resent retries for object: "
2270                                "%llu:%llu, rc = %d.\n",
2271                                req->rq_import->imp_obd->obd_name,
2272                                POSTID(&aa->aa_oa->o_oi), rc);
2273                 }
2274
2275                 if (rc == 0)
2276                         RETURN(0);
2277                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2278                         rc = -EIO;
2279         }
2280
2281         if (rc == 0) {
2282                 struct obdo *oa = aa->aa_oa;
2283                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2284                 unsigned long valid = 0;
2285                 struct cl_object *obj;
2286                 struct osc_async_page *last;
2287
2288                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2289                 obj = osc2cl(last->oap_obj);
2290
2291                 cl_object_attr_lock(obj);
2292                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2293                         attr->cat_blocks = oa->o_blocks;
2294                         valid |= CAT_BLOCKS;
2295                 }
2296                 if (oa->o_valid & OBD_MD_FLMTIME) {
2297                         attr->cat_mtime = oa->o_mtime;
2298                         valid |= CAT_MTIME;
2299                 }
2300                 if (oa->o_valid & OBD_MD_FLATIME) {
2301                         attr->cat_atime = oa->o_atime;
2302                         valid |= CAT_ATIME;
2303                 }
2304                 if (oa->o_valid & OBD_MD_FLCTIME) {
2305                         attr->cat_ctime = oa->o_ctime;
2306                         valid |= CAT_CTIME;
2307                 }
2308
2309                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2310                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2311                         loff_t last_off = last->oap_count + last->oap_obj_off +
2312                                 last->oap_page_off;
2313
2314                         /* Change file size if this is an out of quota or
2315                          * direct IO write and it extends the file size */
2316                         if (loi->loi_lvb.lvb_size < last_off) {
2317                                 attr->cat_size = last_off;
2318                                 valid |= CAT_SIZE;
2319                         }
2320                         /* Extend KMS if it's not a lockless write */
2321                         if (loi->loi_kms < last_off &&
2322                             oap2osc_page(last)->ops_srvlock == 0) {
2323                                 attr->cat_kms = last_off;
2324                                 valid |= CAT_KMS;
2325                         }
2326                 }
2327
2328                 if (valid != 0)
2329                         cl_object_attr_update(env, obj, attr, valid);
2330                 cl_object_attr_unlock(obj);
2331         }
2332         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2333         aa->aa_oa = NULL;
2334
2335         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2336                 osc_inc_unstable_pages(req);
2337
2338         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2339                 list_del_init(&ext->oe_link);
2340                 osc_extent_finish(env, ext, 1,
2341                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2342         }
2343         LASSERT(list_empty(&aa->aa_exts));
2344         LASSERT(list_empty(&aa->aa_oaps));
2345
2346         transferred = (req->rq_bulk == NULL ? /* short io */
2347                        aa->aa_requested_nob :
2348                        req->rq_bulk->bd_nob_transferred);
2349
2350         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2351         ptlrpc_lprocfs_brw(req, transferred);
2352
2353         spin_lock(&cli->cl_loi_list_lock);
2354         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2355          * is called so we know whether to go to sync BRWs or wait for more
2356          * RPCs to complete */
2357         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2358                 cli->cl_w_in_flight--;
2359         else
2360                 cli->cl_r_in_flight--;
2361         osc_wake_cache_waiters(cli);
2362         spin_unlock(&cli->cl_loi_list_lock);
2363
2364         osc_io_unplug(env, cli, NULL);
2365         RETURN(rc);
2366 }
2367
2368 static void brw_commit(struct ptlrpc_request *req)
2369 {
2370         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2371          * this called via the rq_commit_cb, I need to ensure
2372          * osc_dec_unstable_pages is still called. Otherwise unstable
2373          * pages may be leaked. */
2374         spin_lock(&req->rq_lock);
2375         if (likely(req->rq_unstable)) {
2376                 req->rq_unstable = 0;
2377                 spin_unlock(&req->rq_lock);
2378
2379                 osc_dec_unstable_pages(req);
2380         } else {
2381                 req->rq_committed = 1;
2382                 spin_unlock(&req->rq_lock);
2383         }
2384 }
2385
2386 /**
2387  * Build an RPC by the list of extent @ext_list. The caller must ensure
2388  * that the total pages in this list are NOT over max pages per RPC.
2389  * Extents in the list must be in OES_RPC state.
2390  */
2391 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2392                   struct list_head *ext_list, int cmd)
2393 {
2394         struct ptlrpc_request           *req = NULL;
2395         struct osc_extent               *ext;
2396         struct brw_page                 **pga = NULL;
2397         struct osc_brw_async_args       *aa = NULL;
2398         struct obdo                     *oa = NULL;
2399         struct osc_async_page           *oap;
2400         struct osc_object               *obj = NULL;
2401         struct cl_req_attr              *crattr = NULL;
2402         loff_t                          starting_offset = OBD_OBJECT_EOF;
2403         loff_t                          ending_offset = 0;
2404         /* '1' for consistency with code that checks !mpflag to restore */
2405         int mpflag = 1;
2406         int                             mem_tight = 0;
2407         int                             page_count = 0;
2408         bool                            soft_sync = false;
2409         bool                            ndelay = false;
2410         int                             i;
2411         int                             grant = 0;
2412         int                             rc;
2413         __u32                           layout_version = 0;
2414         LIST_HEAD(rpc_list);
2415         struct ost_body                 *body;
2416         ENTRY;
2417         LASSERT(!list_empty(ext_list));
2418
2419         /* add pages into rpc_list to build BRW rpc */
2420         list_for_each_entry(ext, ext_list, oe_link) {
2421                 LASSERT(ext->oe_state == OES_RPC);
2422                 mem_tight |= ext->oe_memalloc;
2423                 grant += ext->oe_grants;
2424                 page_count += ext->oe_nr_pages;
2425                 layout_version = max(layout_version, ext->oe_layout_version);
2426                 if (obj == NULL)
2427                         obj = ext->oe_obj;
2428         }
2429
2430         soft_sync = osc_over_unstable_soft_limit(cli);
2431         if (mem_tight)
2432                 mpflag = memalloc_noreclaim_save();
2433
2434         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2435         if (pga == NULL)
2436                 GOTO(out, rc = -ENOMEM);
2437
2438         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2439         if (oa == NULL)
2440                 GOTO(out, rc = -ENOMEM);
2441
2442         i = 0;
2443         list_for_each_entry(ext, ext_list, oe_link) {
2444                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2445                         if (mem_tight)
2446                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2447                         if (soft_sync)
2448                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2449                         pga[i] = &oap->oap_brw_page;
2450                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2451                         i++;
2452
2453                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2454                         if (starting_offset == OBD_OBJECT_EOF ||
2455                             starting_offset > oap->oap_obj_off)
2456                                 starting_offset = oap->oap_obj_off;
2457                         else
2458                                 LASSERT(oap->oap_page_off == 0);
2459                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2460                                 ending_offset = oap->oap_obj_off +
2461                                                 oap->oap_count;
2462                         else
2463                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2464                                         PAGE_SIZE);
2465                 }
2466                 if (ext->oe_ndelay)
2467                         ndelay = true;
2468         }
2469
2470         /* first page in the list */
2471         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2472
2473         crattr = &osc_env_info(env)->oti_req_attr;
2474         memset(crattr, 0, sizeof(*crattr));
2475         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2476         crattr->cra_flags = ~0ULL;
2477         crattr->cra_page = oap2cl_page(oap);
2478         crattr->cra_oa = oa;
2479         cl_req_attr_set(env, osc2cl(obj), crattr);
2480
2481         if (cmd == OBD_BRW_WRITE) {
2482                 oa->o_grant_used = grant;
2483                 if (layout_version > 0) {
2484                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2485                                PFID(&oa->o_oi.oi_fid), layout_version);
2486
2487                         oa->o_layout_version = layout_version;
2488                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2489                 }
2490         }
2491
2492         sort_brw_pages(pga, page_count);
2493         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2494         if (rc != 0) {
2495                 CERROR("prep_req failed: %d\n", rc);
2496                 GOTO(out, rc);
2497         }
2498
2499         req->rq_commit_cb = brw_commit;
2500         req->rq_interpret_reply = brw_interpret;
2501         req->rq_memalloc = mem_tight != 0;
2502         oap->oap_request = ptlrpc_request_addref(req);
2503         if (ndelay) {
2504                 req->rq_no_resend = req->rq_no_delay = 1;
2505                 /* probably set a shorter timeout value.
2506                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2507                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2508         }
2509
2510         /* Need to update the timestamps after the request is built in case
2511          * we race with setattr (locally or in queue at OST).  If OST gets
2512          * later setattr before earlier BRW (as determined by the request xid),
2513          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2514          * way to do this in a single call.  bug 10150 */
2515         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2516         crattr->cra_oa = &body->oa;
2517         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2518         cl_req_attr_set(env, osc2cl(obj), crattr);
2519         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2520
2521         aa = ptlrpc_req_async_args(aa, req);
2522         INIT_LIST_HEAD(&aa->aa_oaps);
2523         list_splice_init(&rpc_list, &aa->aa_oaps);
2524         INIT_LIST_HEAD(&aa->aa_exts);
2525         list_splice_init(ext_list, &aa->aa_exts);
2526
2527         spin_lock(&cli->cl_loi_list_lock);
2528         starting_offset >>= PAGE_SHIFT;
2529         if (cmd == OBD_BRW_READ) {
2530                 cli->cl_r_in_flight++;
2531                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2532                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2533                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2534                                       starting_offset + 1);
2535         } else {
2536                 cli->cl_w_in_flight++;
2537                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2538                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2539                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2540                                       starting_offset + 1);
2541         }
2542         spin_unlock(&cli->cl_loi_list_lock);
2543
2544         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2545                   page_count, aa, cli->cl_r_in_flight,
2546                   cli->cl_w_in_flight);
2547         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2548
2549         ptlrpcd_add_req(req);
2550         rc = 0;
2551         EXIT;
2552
2553 out:
2554         if (mem_tight)
2555                 memalloc_noreclaim_restore(mpflag);
2556
2557         if (rc != 0) {
2558                 LASSERT(req == NULL);
2559
2560                 if (oa)
2561                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2562                 if (pga) {
2563                         osc_release_bounce_pages(pga, page_count);
2564                         osc_release_ppga(pga, page_count);
2565                 }
2566                 /* this should happen rarely and is pretty bad, it makes the
2567                  * pending list not follow the dirty order */
2568                 while (!list_empty(ext_list)) {
2569                         ext = list_entry(ext_list->next, struct osc_extent,
2570                                          oe_link);
2571                         list_del_init(&ext->oe_link);
2572                         osc_extent_finish(env, ext, 0, rc);
2573                 }
2574         }
2575         RETURN(rc);
2576 }
2577
2578 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2579 {
2580         int set = 0;
2581
2582         LASSERT(lock != NULL);
2583
2584         lock_res_and_lock(lock);
2585
2586         if (lock->l_ast_data == NULL)
2587                 lock->l_ast_data = data;
2588         if (lock->l_ast_data == data)
2589                 set = 1;
2590
2591         unlock_res_and_lock(lock);
2592
2593         return set;
2594 }
2595
2596 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2597                      void *cookie, struct lustre_handle *lockh,
2598                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2599                      int errcode)
2600 {
2601         bool intent = *flags & LDLM_FL_HAS_INTENT;
2602         int rc;
2603         ENTRY;
2604
2605         /* The request was created before ldlm_cli_enqueue call. */
2606         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2607                 struct ldlm_reply *rep;
2608
2609                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2610                 LASSERT(rep != NULL);
2611
2612                 rep->lock_policy_res1 =
2613                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2614                 if (rep->lock_policy_res1)
2615                         errcode = rep->lock_policy_res1;
2616                 if (!speculative)
2617                         *flags |= LDLM_FL_LVB_READY;
2618         } else if (errcode == ELDLM_OK) {
2619                 *flags |= LDLM_FL_LVB_READY;
2620         }
2621
2622         /* Call the update callback. */
2623         rc = (*upcall)(cookie, lockh, errcode);
2624
2625         /* release the reference taken in ldlm_cli_enqueue() */
2626         if (errcode == ELDLM_LOCK_MATCHED)
2627                 errcode = ELDLM_OK;
2628         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2629                 ldlm_lock_decref(lockh, mode);
2630
2631         RETURN(rc);
2632 }
2633
2634 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2635                           void *args, int rc)
2636 {
2637         struct osc_enqueue_args *aa = args;
2638         struct ldlm_lock *lock;
2639         struct lustre_handle *lockh = &aa->oa_lockh;
2640         enum ldlm_mode mode = aa->oa_mode;
2641         struct ost_lvb *lvb = aa->oa_lvb;
2642         __u32 lvb_len = sizeof(*lvb);
2643         __u64 flags = 0;
2644
2645         ENTRY;
2646
2647         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2648          * be valid. */
2649         lock = ldlm_handle2lock(lockh);
2650         LASSERTF(lock != NULL,
2651                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2652                  lockh->cookie, req, aa);
2653
2654         /* Take an additional reference so that a blocking AST that
2655          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2656          * to arrive after an upcall has been executed by
2657          * osc_enqueue_fini(). */
2658         ldlm_lock_addref(lockh, mode);
2659
2660         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2661         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2662
2663         /* Let CP AST to grant the lock first. */
2664         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2665
2666         if (aa->oa_speculative) {
2667                 LASSERT(aa->oa_lvb == NULL);
2668                 LASSERT(aa->oa_flags == NULL);
2669                 aa->oa_flags = &flags;
2670         }
2671
2672         /* Complete obtaining the lock procedure. */
2673         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2674                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2675                                    lockh, rc);
2676         /* Complete osc stuff. */
2677         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2678                               aa->oa_flags, aa->oa_speculative, rc);
2679
2680         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2681
2682         ldlm_lock_decref(lockh, mode);
2683         LDLM_LOCK_PUT(lock);
2684         RETURN(rc);
2685 }
2686
2687 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2688  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2689  * other synchronous requests, however keeping some locks and trying to obtain
2690  * others may take a considerable amount of time in a case of ost failure; and
2691  * when other sync requests do not get released lock from a client, the client
2692  * is evicted from the cluster -- such scenarious make the life difficult, so
2693  * release locks just after they are obtained. */
2694 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2695                      __u64 *flags, union ldlm_policy_data *policy,
2696                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2697                      void *cookie, struct ldlm_enqueue_info *einfo,
2698                      struct ptlrpc_request_set *rqset, int async,
2699                      bool speculative)
2700 {
2701         struct obd_device *obd = exp->exp_obd;
2702         struct lustre_handle lockh = { 0 };
2703         struct ptlrpc_request *req = NULL;
2704         int intent = *flags & LDLM_FL_HAS_INTENT;
2705         __u64 match_flags = *flags;
2706         enum ldlm_mode mode;
2707         int rc;
2708         ENTRY;
2709
2710         /* Filesystem lock extents are extended to page boundaries so that
2711          * dealing with the page cache is a little smoother.  */
2712         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2713         policy->l_extent.end |= ~PAGE_MASK;
2714
2715         /* Next, search for already existing extent locks that will cover us */
2716         /* If we're trying to read, we also search for an existing PW lock.  The
2717          * VFS and page cache already protect us locally, so lots of readers/
2718          * writers can share a single PW lock.
2719          *
2720          * There are problems with conversion deadlocks, so instead of
2721          * converting a read lock to a write lock, we'll just enqueue a new
2722          * one.
2723          *
2724          * At some point we should cancel the read lock instead of making them
2725          * send us a blocking callback, but there are problems with canceling
2726          * locks out from other users right now, too. */
2727         mode = einfo->ei_mode;
2728         if (einfo->ei_mode == LCK_PR)
2729                 mode |= LCK_PW;
2730         /* Normal lock requests must wait for the LVB to be ready before
2731          * matching a lock; speculative lock requests do not need to,
2732          * because they will not actually use the lock. */
2733         if (!speculative)
2734                 match_flags |= LDLM_FL_LVB_READY;
2735         if (intent != 0)
2736                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2737         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2738                                einfo->ei_type, policy, mode, &lockh);
2739         if (mode) {
2740                 struct ldlm_lock *matched;
2741
2742                 if (*flags & LDLM_FL_TEST_LOCK)
2743                         RETURN(ELDLM_OK);
2744
2745                 matched = ldlm_handle2lock(&lockh);
2746                 if (speculative) {
2747                         /* This DLM lock request is speculative, and does not
2748                          * have an associated IO request. Therefore if there
2749                          * is already a DLM lock, it wll just inform the
2750                          * caller to cancel the request for this stripe.*/
2751                         lock_res_and_lock(matched);
2752                         if (ldlm_extent_equal(&policy->l_extent,
2753                             &matched->l_policy_data.l_extent))
2754                                 rc = -EEXIST;
2755                         else
2756                                 rc = -ECANCELED;
2757                         unlock_res_and_lock(matched);
2758
2759                         ldlm_lock_decref(&lockh, mode);
2760                         LDLM_LOCK_PUT(matched);
2761                         RETURN(rc);
2762                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2763                         *flags |= LDLM_FL_LVB_READY;
2764
2765                         /* We already have a lock, and it's referenced. */
2766                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2767
2768                         ldlm_lock_decref(&lockh, mode);
2769                         LDLM_LOCK_PUT(matched);
2770                         RETURN(ELDLM_OK);
2771                 } else {
2772                         ldlm_lock_decref(&lockh, mode);
2773                         LDLM_LOCK_PUT(matched);
2774                 }
2775         }
2776
2777         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2778                 RETURN(-ENOLCK);
2779
2780         if (intent) {
2781                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2782                                            &RQF_LDLM_ENQUEUE_LVB);
2783                 if (req == NULL)
2784                         RETURN(-ENOMEM);
2785
2786                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2787                 if (rc) {
2788                         ptlrpc_request_free(req);
2789                         RETURN(rc);
2790                 }
2791
2792                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2793                                      sizeof *lvb);
2794                 ptlrpc_request_set_replen(req);
2795         }
2796
2797         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2798         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2799
2800         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2801                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2802         if (async) {
2803                 if (!rc) {
2804                         struct osc_enqueue_args *aa;
2805                         aa = ptlrpc_req_async_args(aa, req);
2806                         aa->oa_exp         = exp;
2807                         aa->oa_mode        = einfo->ei_mode;
2808                         aa->oa_type        = einfo->ei_type;
2809                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2810                         aa->oa_upcall      = upcall;
2811                         aa->oa_cookie      = cookie;
2812                         aa->oa_speculative = speculative;
2813                         if (!speculative) {
2814                                 aa->oa_flags  = flags;
2815                                 aa->oa_lvb    = lvb;
2816                         } else {
2817                                 /* speculative locks are essentially to enqueue
2818                                  * a DLM lock  in advance, so we don't care
2819                                  * about the result of the enqueue. */
2820                                 aa->oa_lvb    = NULL;
2821                                 aa->oa_flags  = NULL;
2822                         }
2823
2824                         req->rq_interpret_reply = osc_enqueue_interpret;
2825                         ptlrpc_set_add_req(rqset, req);
2826                 } else if (intent) {
2827                         ptlrpc_req_finished(req);
2828                 }
2829                 RETURN(rc);
2830         }
2831
2832         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2833                               flags, speculative, rc);
2834         if (intent)
2835                 ptlrpc_req_finished(req);
2836
2837         RETURN(rc);
2838 }
2839
2840 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2841                    struct ldlm_res_id *res_id, enum ldlm_type type,
2842                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2843                    __u64 *flags, struct osc_object *obj,
2844                    struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2845 {
2846         struct obd_device *obd = exp->exp_obd;
2847         __u64 lflags = *flags;
2848         enum ldlm_mode rc;
2849         ENTRY;
2850
2851         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2852                 RETURN(-EIO);
2853
2854         /* Filesystem lock extents are extended to page boundaries so that
2855          * dealing with the page cache is a little smoother */
2856         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2857         policy->l_extent.end |= ~PAGE_MASK;
2858
2859         /* Next, search for already existing extent locks that will cover us */
2860         /* If we're trying to read, we also search for an existing PW lock.  The
2861          * VFS and page cache already protect us locally, so lots of readers/
2862          * writers can share a single PW lock. */
2863         rc = mode;
2864         if (mode == LCK_PR)
2865                 rc |= LCK_PW;
2866
2867         rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2868                                         res_id, type, policy, rc, lockh,
2869                                         match_flags);
2870         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2871                 RETURN(rc);
2872
2873         if (obj != NULL) {
2874                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2875
2876                 LASSERT(lock != NULL);
2877                 if (osc_set_lock_data(lock, obj)) {
2878                         lock_res_and_lock(lock);
2879                         if (!ldlm_is_lvb_cached(lock)) {
2880                                 LASSERT(lock->l_ast_data == obj);
2881                                 osc_lock_lvb_update(env, obj, lock, NULL);
2882                                 ldlm_set_lvb_cached(lock);
2883                         }
2884                         unlock_res_and_lock(lock);
2885                 } else {
2886                         ldlm_lock_decref(lockh, rc);
2887                         rc = 0;
2888                 }
2889                 LDLM_LOCK_PUT(lock);
2890         }
2891         RETURN(rc);
2892 }
2893
2894 static int osc_statfs_interpret(const struct lu_env *env,
2895                                 struct ptlrpc_request *req, void *args, int rc)
2896 {
2897         struct osc_async_args *aa = args;
2898         struct obd_statfs *msfs;
2899
2900         ENTRY;
2901         if (rc == -EBADR)
2902                 /*
2903                  * The request has in fact never been sent due to issues at
2904                  * a higher level (LOV).  Exit immediately since the caller
2905                  * is aware of the problem and takes care of the clean up.
2906                  */
2907                 RETURN(rc);
2908
2909         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2910             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2911                 GOTO(out, rc = 0);
2912
2913         if (rc != 0)
2914                 GOTO(out, rc);
2915
2916         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2917         if (msfs == NULL)
2918                 GOTO(out, rc = -EPROTO);
2919
2920         *aa->aa_oi->oi_osfs = *msfs;
2921 out:
2922         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2923
2924         RETURN(rc);
2925 }
2926
2927 static int osc_statfs_async(struct obd_export *exp,
2928                             struct obd_info *oinfo, time64_t max_age,
2929                             struct ptlrpc_request_set *rqset)
2930 {
2931         struct obd_device     *obd = class_exp2obd(exp);
2932         struct ptlrpc_request *req;
2933         struct osc_async_args *aa;
2934         int rc;
2935         ENTRY;
2936
2937         if (obd->obd_osfs_age >= max_age) {
2938                 CDEBUG(D_SUPER,
2939                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2940                        obd->obd_name, &obd->obd_osfs,
2941                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2942                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2943                 spin_lock(&obd->obd_osfs_lock);
2944                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2945                 spin_unlock(&obd->obd_osfs_lock);
2946                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2947                 if (oinfo->oi_cb_up)
2948                         oinfo->oi_cb_up(oinfo, 0);
2949
2950                 RETURN(0);
2951         }
2952
2953         /* We could possibly pass max_age in the request (as an absolute
2954          * timestamp or a "seconds.usec ago") so the target can avoid doing
2955          * extra calls into the filesystem if that isn't necessary (e.g.
2956          * during mount that would help a bit).  Having relative timestamps
2957          * is not so great if request processing is slow, while absolute
2958          * timestamps are not ideal because they need time synchronization. */
2959         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2960         if (req == NULL)
2961                 RETURN(-ENOMEM);
2962
2963         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2964         if (rc) {
2965                 ptlrpc_request_free(req);
2966                 RETURN(rc);
2967         }
2968         ptlrpc_request_set_replen(req);
2969         req->rq_request_portal = OST_CREATE_PORTAL;
2970         ptlrpc_at_set_req_timeout(req);
2971
2972         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2973                 /* procfs requests not want stat in wait for avoid deadlock */
2974                 req->rq_no_resend = 1;
2975                 req->rq_no_delay = 1;
2976         }
2977
2978         req->rq_interpret_reply = osc_statfs_interpret;
2979         aa = ptlrpc_req_async_args(aa, req);
2980         aa->aa_oi = oinfo;
2981
2982         ptlrpc_set_add_req(rqset, req);
2983         RETURN(0);
2984 }
2985
2986 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2987                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2988 {
2989         struct obd_device     *obd = class_exp2obd(exp);
2990         struct obd_statfs     *msfs;
2991         struct ptlrpc_request *req;
2992         struct obd_import     *imp = NULL;
2993         int rc;
2994         ENTRY;
2995
2996
2997         /*Since the request might also come from lprocfs, so we need
2998          *sync this with client_disconnect_export Bug15684*/
2999         down_read(&obd->u.cli.cl_sem);
3000         if (obd->u.cli.cl_import)
3001                 imp = class_import_get(obd->u.cli.cl_import);
3002         up_read(&obd->u.cli.cl_sem);
3003         if (!imp)
3004                 RETURN(-ENODEV);
3005
3006         /* We could possibly pass max_age in the request (as an absolute
3007          * timestamp or a "seconds.usec ago") so the target can avoid doing
3008          * extra calls into the filesystem if that isn't necessary (e.g.
3009          * during mount that would help a bit).  Having relative timestamps
3010          * is not so great if request processing is slow, while absolute
3011          * timestamps are not ideal because they need time synchronization. */
3012         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3013
3014         class_import_put(imp);
3015
3016         if (req == NULL)
3017                 RETURN(-ENOMEM);
3018
3019         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3020         if (rc) {
3021                 ptlrpc_request_free(req);
3022                 RETURN(rc);
3023         }
3024         ptlrpc_request_set_replen(req);
3025         req->rq_request_portal = OST_CREATE_PORTAL;
3026         ptlrpc_at_set_req_timeout(req);
3027
3028         if (flags & OBD_STATFS_NODELAY) {
3029                 /* procfs requests not want stat in wait for avoid deadlock */
3030                 req->rq_no_resend = 1;
3031                 req->rq_no_delay = 1;
3032         }
3033
3034         rc = ptlrpc_queue_wait(req);
3035         if (rc)
3036                 GOTO(out, rc);
3037
3038         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3039         if (msfs == NULL)
3040                 GOTO(out, rc = -EPROTO);
3041
3042         *osfs = *msfs;
3043
3044         EXIT;
3045 out:
3046         ptlrpc_req_finished(req);
3047         return rc;
3048 }
3049
3050 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3051                          void *karg, void __user *uarg)
3052 {
3053         struct obd_device *obd = exp->exp_obd;
3054         struct obd_ioctl_data *data = karg;
3055         int rc = 0;
3056
3057         ENTRY;
3058         if (!try_module_get(THIS_MODULE)) {
3059                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3060                        module_name(THIS_MODULE));
3061                 return -EINVAL;
3062         }
3063         switch (cmd) {
3064         case OBD_IOC_CLIENT_RECOVER:
3065                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3066                                            data->ioc_inlbuf1, 0);
3067                 if (rc > 0)
3068                         rc = 0;
3069                 break;
3070         case IOC_OSC_SET_ACTIVE:
3071                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3072                                               data->ioc_offset);
3073                 break;
3074         default:
3075                 rc = -ENOTTY;
3076                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3077                        obd->obd_name, cmd, current->comm, rc);
3078                 break;
3079         }
3080
3081         module_put(THIS_MODULE);
3082         return rc;
3083 }
3084
3085 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3086                        u32 keylen, void *key, u32 vallen, void *val,
3087                        struct ptlrpc_request_set *set)
3088 {
3089         struct ptlrpc_request *req;
3090         struct obd_device     *obd = exp->exp_obd;
3091         struct obd_import     *imp = class_exp2cliimp(exp);
3092         char                  *tmp;
3093         int                    rc;
3094         ENTRY;
3095
3096         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3097
3098         if (KEY_IS(KEY_CHECKSUM)) {
3099                 if (vallen != sizeof(int))
3100                         RETURN(-EINVAL);
3101                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3102                 RETURN(0);
3103         }
3104
3105         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3106                 sptlrpc_conf_client_adapt(obd);
3107                 RETURN(0);
3108         }
3109
3110         if (KEY_IS(KEY_FLUSH_CTX)) {
3111                 sptlrpc_import_flush_my_ctx(imp);
3112                 RETURN(0);
3113         }
3114
3115         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3116                 struct client_obd *cli = &obd->u.cli;
3117                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3118                 long target = *(long *)val;
3119
3120                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3121                 *(long *)val -= nr;
3122                 RETURN(0);
3123         }
3124
3125         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3126                 RETURN(-EINVAL);
3127
3128         /* We pass all other commands directly to OST. Since nobody calls osc
3129            methods directly and everybody is supposed to go through LOV, we
3130            assume lov checked invalid values for us.
3131            The only recognised values so far are evict_by_nid and mds_conn.
3132            Even if something bad goes through, we'd get a -EINVAL from OST
3133            anyway. */
3134
3135         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3136                                                 &RQF_OST_SET_GRANT_INFO :
3137                                                 &RQF_OBD_SET_INFO);
3138         if (req == NULL)
3139                 RETURN(-ENOMEM);
3140
3141         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3142                              RCL_CLIENT, keylen);
3143         if (!KEY_IS(KEY_GRANT_SHRINK))
3144                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3145                                      RCL_CLIENT, vallen);
3146         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3147         if (rc) {
3148                 ptlrpc_request_free(req);
3149                 RETURN(rc);
3150         }
3151
3152         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3153         memcpy(tmp, key, keylen);
3154         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3155                                                         &RMF_OST_BODY :
3156                                                         &RMF_SETINFO_VAL);
3157         memcpy(tmp, val, vallen);
3158
3159         if (KEY_IS(KEY_GRANT_SHRINK)) {
3160                 struct osc_grant_args *aa;
3161                 struct obdo *oa;
3162
3163                 aa = ptlrpc_req_async_args(aa, req);
3164                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3165                 if (!oa) {
3166                         ptlrpc_req_finished(req);
3167                         RETURN(-ENOMEM);
3168                 }
3169                 *oa = ((struct ost_body *)val)->oa;
3170                 aa->aa_oa = oa;
3171                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3172         }
3173
3174         ptlrpc_request_set_replen(req);
3175         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3176                 LASSERT(set != NULL);
3177                 ptlrpc_set_add_req(set, req);
3178                 ptlrpc_check_set(NULL, set);
3179         } else {
3180                 ptlrpcd_add_req(req);
3181         }
3182
3183         RETURN(0);
3184 }
3185 EXPORT_SYMBOL(osc_set_info_async);
3186
3187 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3188                   struct obd_device *obd, struct obd_uuid *cluuid,
3189                   struct obd_connect_data *data, void *localdata)
3190 {
3191         struct client_obd *cli = &obd->u.cli;
3192
3193         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3194                 long lost_grant;
3195                 long grant;
3196
3197                 spin_lock(&cli->cl_loi_list_lock);
3198                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3199                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3200                         /* restore ocd_grant_blkbits as client page bits */
3201                         data->ocd_grant_blkbits = PAGE_SHIFT;
3202                         grant += cli->cl_dirty_grant;
3203                 } else {
3204                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3205                 }
3206                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3207                 lost_grant = cli->cl_lost_grant;
3208                 cli->cl_lost_grant = 0;
3209                 spin_unlock(&cli->cl_loi_list_lock);
3210
3211                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3212                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3213                        data->ocd_version, data->ocd_grant, lost_grant);
3214         }
3215
3216         RETURN(0);
3217 }
3218 EXPORT_SYMBOL(osc_reconnect);
3219
3220 int osc_disconnect(struct obd_export *exp)
3221 {
3222         struct obd_device *obd = class_exp2obd(exp);
3223         int rc;
3224
3225         rc = client_disconnect_export(exp);
3226         /**
3227          * Initially we put del_shrink_grant before disconnect_export, but it
3228          * causes the following problem if setup (connect) and cleanup
3229          * (disconnect) are tangled together.
3230          *      connect p1                     disconnect p2
3231          *   ptlrpc_connect_import
3232          *     ...............               class_manual_cleanup
3233          *                                     osc_disconnect
3234          *                                     del_shrink_grant
3235          *   ptlrpc_connect_interrupt
3236          *     osc_init_grant
3237          *   add this client to shrink list
3238          *                                      cleanup_osc
3239          * Bang! grant shrink thread trigger the shrink. BUG18662
3240          */
3241         osc_del_grant_list(&obd->u.cli);
3242         return rc;
3243 }
3244 EXPORT_SYMBOL(osc_disconnect);
3245
3246 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3247                                  struct hlist_node *hnode, void *arg)
3248 {
3249         struct lu_env *env = arg;
3250         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3251         struct ldlm_lock *lock;
3252         struct osc_object *osc = NULL;
3253         ENTRY;
3254
3255         lock_res(res);
3256         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3257                 if (lock->l_ast_data != NULL && osc == NULL) {
3258                         osc = lock->l_ast_data;
3259                         cl_object_get(osc2cl(osc));
3260                 }
3261
3262                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3263                  * by the 2nd round of ldlm_namespace_clean() call in
3264                  * osc_import_event(). */
3265                 ldlm_clear_cleaned(lock);
3266         }
3267         unlock_res(res);
3268
3269         if (osc != NULL) {
3270                 osc_object_invalidate(env, osc);
3271                 cl_object_put(env, osc2cl(osc));
3272         }
3273
3274         RETURN(0);
3275 }
3276 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3277
3278 static int osc_import_event(struct obd_device *obd,
3279                             struct obd_import *imp,
3280                             enum obd_import_event event)
3281 {
3282         struct client_obd *cli;
3283         int rc = 0;
3284
3285         ENTRY;
3286         LASSERT(imp->imp_obd == obd);
3287
3288         switch (event) {
3289         case IMP_EVENT_DISCON: {
3290                 cli = &obd->u.cli;
3291                 spin_lock(&cli->cl_loi_list_lock);
3292                 cli->cl_avail_grant = 0;
3293                 cli->cl_lost_grant = 0;
3294                 spin_unlock(&cli->cl_loi_list_lock);
3295                 break;
3296         }
3297         case IMP_EVENT_INACTIVE: {
3298                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3299                 break;
3300         }
3301         case IMP_EVENT_INVALIDATE: {
3302                 struct ldlm_namespace *ns = obd->obd_namespace;
3303                 struct lu_env         *env;
3304                 __u16                  refcheck;
3305
3306                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3307
3308                 env = cl_env_get(&refcheck);
3309                 if (!IS_ERR(env)) {
3310                         osc_io_unplug(env, &obd->u.cli, NULL);
3311
3312                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3313                                                  osc_ldlm_resource_invalidate,
3314                                                  env, 0);
3315                         cl_env_put(env, &refcheck);
3316
3317                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3318                 } else
3319                         rc = PTR_ERR(env);
3320                 break;
3321         }
3322         case IMP_EVENT_ACTIVE: {
3323                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3324                 break;
3325         }
3326         case IMP_EVENT_OCD: {
3327                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3328
3329                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3330                         osc_init_grant(&obd->u.cli, ocd);
3331
3332                 /* See bug 7198 */
3333                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3334                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3335
3336                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3337                 break;
3338         }
3339         case IMP_EVENT_DEACTIVATE: {
3340                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3341                 break;
3342         }
3343         case IMP_EVENT_ACTIVATE: {
3344                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3345                 break;
3346         }
3347         default:
3348                 CERROR("Unknown import event %d\n", event);
3349                 LBUG();
3350         }
3351         RETURN(rc);
3352 }
3353
3354 /**
3355  * Determine whether the lock can be canceled before replaying the lock
3356  * during recovery, see bug16774 for detailed information.
3357  *
3358  * \retval zero the lock can't be canceled
3359  * \retval other ok to cancel
3360  */
3361 static int osc_cancel_weight(struct ldlm_lock *lock)
3362 {
3363         /*
3364          * Cancel all unused and granted extent lock.
3365          */
3366         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3367             ldlm_is_granted(lock) &&
3368             osc_ldlm_weigh_ast(lock) == 0)
3369                 RETURN(1);
3370
3371         RETURN(0);
3372 }
3373
3374 static int brw_queue_work(const struct lu_env *env, void *data)
3375 {
3376         struct client_obd *cli = data;
3377
3378         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3379
3380         osc_io_unplug(env, cli, NULL);
3381         RETURN(0);
3382 }
3383
3384 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3385 {
3386         struct client_obd *cli = &obd->u.cli;
3387         void *handler;
3388         int rc;
3389
3390         ENTRY;
3391
3392         rc = ptlrpcd_addref();
3393         if (rc)
3394                 RETURN(rc);
3395
3396         rc = client_obd_setup(obd, lcfg);
3397         if (rc)
3398                 GOTO(out_ptlrpcd, rc);
3399
3400
3401         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3402         if (IS_ERR(handler))
3403                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3404         cli->cl_writeback_work = handler;
3405
3406         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3407         if (IS_ERR(handler))
3408                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3409         cli->cl_lru_work = handler;
3410
3411         rc = osc_quota_setup(obd);
3412         if (rc)
3413                 GOTO(out_ptlrpcd_work, rc);
3414
3415         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3416         osc_update_next_shrink(cli);
3417
3418         RETURN(rc);
3419
3420 out_ptlrpcd_work:
3421         if (cli->cl_writeback_work != NULL) {
3422                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3423                 cli->cl_writeback_work = NULL;
3424         }
3425         if (cli->cl_lru_work != NULL) {
3426                 ptlrpcd_destroy_work(cli->cl_lru_work);
3427                 cli->cl_lru_work = NULL;
3428         }
3429         client_obd_cleanup(obd);
3430 out_ptlrpcd:
3431         ptlrpcd_decref();
3432         RETURN(rc);
3433 }
3434 EXPORT_SYMBOL(osc_setup_common);
3435
3436 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3437 {
3438         struct client_obd *cli = &obd->u.cli;
3439         int                adding;
3440         int                added;
3441         int                req_count;
3442         int                rc;
3443
3444         ENTRY;
3445
3446         rc = osc_setup_common(obd, lcfg);
3447         if (rc < 0)
3448                 RETURN(rc);
3449
3450         rc = osc_tunables_init(obd);
3451         if (rc)
3452                 RETURN(rc);
3453
3454         /*
3455          * We try to control the total number of requests with a upper limit
3456          * osc_reqpool_maxreqcount. There might be some race which will cause
3457          * over-limit allocation, but it is fine.
3458          */
3459         req_count = atomic_read(&osc_pool_req_count);
3460         if (req_count < osc_reqpool_maxreqcount) {
3461                 adding = cli->cl_max_rpcs_in_flight + 2;
3462                 if (req_count + adding > osc_reqpool_maxreqcount)
3463                         adding = osc_reqpool_maxreqcount - req_count;
3464
3465                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3466                 atomic_add(added, &osc_pool_req_count);
3467         }
3468
3469         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3470
3471         spin_lock(&osc_shrink_lock);
3472         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3473         spin_unlock(&osc_shrink_lock);
3474         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3475         cli->cl_import->imp_idle_debug = D_HA;
3476
3477         RETURN(0);
3478 }
3479
3480 int osc_precleanup_common(struct obd_device *obd)
3481 {
3482         struct client_obd *cli = &obd->u.cli;
3483         ENTRY;
3484
3485         /* LU-464
3486          * for echo client, export may be on zombie list, wait for
3487          * zombie thread to cull it, because cli.cl_import will be
3488          * cleared in client_disconnect_export():
3489          *   class_export_destroy() -> obd_cleanup() ->
3490          *   echo_device_free() -> echo_client_cleanup() ->
3491          *   obd_disconnect() -> osc_disconnect() ->
3492          *   client_disconnect_export()
3493          */
3494         obd_zombie_barrier();
3495         if (cli->cl_writeback_work) {
3496                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3497                 cli->cl_writeback_work = NULL;
3498         }
3499
3500         if (cli->cl_lru_work) {
3501                 ptlrpcd_destroy_work(cli->cl_lru_work);
3502                 cli->cl_lru_work = NULL;
3503         }
3504
3505         obd_cleanup_client_import(obd);
3506         RETURN(0);
3507 }
3508 EXPORT_SYMBOL(osc_precleanup_common);
3509
3510 static int osc_precleanup(struct obd_device *obd)
3511 {
3512         ENTRY;
3513
3514         osc_precleanup_common(obd);
3515
3516         ptlrpc_lprocfs_unregister_obd(obd);
3517         RETURN(0);
3518 }
3519
3520 int osc_cleanup_common(struct obd_device *obd)
3521 {
3522         struct client_obd *cli = &obd->u.cli;
3523         int rc;
3524
3525         ENTRY;
3526
3527         spin_lock(&osc_shrink_lock);
3528         list_del(&cli->cl_shrink_list);
3529         spin_unlock(&osc_shrink_lock);
3530
3531         /* lru cleanup */
3532         if (cli->cl_cache != NULL) {
3533                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3534                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3535                 list_del_init(&cli->cl_lru_osc);
3536                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3537                 cli->cl_lru_left = NULL;
3538                 cl_cache_decref(cli->cl_cache);
3539                 cli->cl_cache = NULL;
3540         }
3541
3542         /* free memory of osc quota cache */
3543         osc_quota_cleanup(obd);
3544
3545         rc = client_obd_cleanup(obd);
3546
3547         ptlrpcd_decref();
3548         RETURN(rc);
3549 }
3550 EXPORT_SYMBOL(osc_cleanup_common);
3551
3552 static const struct obd_ops osc_obd_ops = {
3553         .o_owner                = THIS_MODULE,
3554         .o_setup                = osc_setup,
3555         .o_precleanup           = osc_precleanup,
3556         .o_cleanup              = osc_cleanup_common,
3557         .o_add_conn             = client_import_add_conn,
3558         .o_del_conn             = client_import_del_conn,
3559         .o_connect              = client_connect_import,
3560         .o_reconnect            = osc_reconnect,
3561         .o_disconnect           = osc_disconnect,
3562         .o_statfs               = osc_statfs,
3563         .o_statfs_async         = osc_statfs_async,
3564         .o_create               = osc_create,
3565         .o_destroy              = osc_destroy,
3566         .o_getattr              = osc_getattr,
3567         .o_setattr              = osc_setattr,
3568         .o_iocontrol            = osc_iocontrol,
3569         .o_set_info_async       = osc_set_info_async,
3570         .o_import_event         = osc_import_event,
3571         .o_quotactl             = osc_quotactl,
3572 };
3573
3574 static struct shrinker *osc_cache_shrinker;
3575 LIST_HEAD(osc_shrink_list);
3576 DEFINE_SPINLOCK(osc_shrink_lock);
3577
3578 #ifndef HAVE_SHRINKER_COUNT
3579 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3580 {
3581         struct shrink_control scv = {
3582                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3583                 .gfp_mask   = shrink_param(sc, gfp_mask)
3584         };
3585         (void)osc_cache_shrink_scan(shrinker, &scv);
3586
3587         return osc_cache_shrink_count(shrinker, &scv);
3588 }
3589 #endif
3590
3591 static int __init osc_init(void)
3592 {
3593         unsigned int reqpool_size;
3594         unsigned int reqsize;
3595         int rc;
3596         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3597                          osc_cache_shrink_count, osc_cache_shrink_scan);
3598         ENTRY;
3599
3600         /* print an address of _any_ initialized kernel symbol from this
3601          * module, to allow debugging with gdb that doesn't support data
3602          * symbols from modules.*/
3603         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3604
3605         rc = lu_kmem_init(osc_caches);
3606         if (rc)
3607                 RETURN(rc);
3608
3609         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3610                                  LUSTRE_OSC_NAME, &osc_device_type);
3611         if (rc)
3612                 GOTO(out_kmem, rc);
3613
3614         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3615
3616         /* This is obviously too much memory, only prevent overflow here */
3617         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3618                 GOTO(out_type, rc = -EINVAL);
3619
3620         reqpool_size = osc_reqpool_mem_max << 20;
3621
3622         reqsize = 1;
3623         while (reqsize < OST_IO_MAXREQSIZE)
3624                 reqsize = reqsize << 1;
3625
3626         /*
3627          * We don't enlarge the request count in OSC pool according to
3628          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3629          * tried after normal allocation failed. So a small OSC pool won't
3630          * cause much performance degression in most of cases.
3631          */
3632         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3633
3634         atomic_set(&osc_pool_req_count, 0);
3635         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3636                                           ptlrpc_add_rqs_to_pool);
3637
3638         if (osc_rq_pool == NULL)
3639                 GOTO(out_type, rc = -ENOMEM);
3640
3641         rc = osc_start_grant_work();
3642         if (rc != 0)
3643                 GOTO(out_req_pool, rc);
3644
3645         RETURN(rc);
3646
3647 out_req_pool:
3648         ptlrpc_free_rq_pool(osc_rq_pool);
3649 out_type:
3650         class_unregister_type(LUSTRE_OSC_NAME);
3651 out_kmem:
3652         lu_kmem_fini(osc_caches);
3653
3654         RETURN(rc);
3655 }
3656
3657 static void __exit osc_exit(void)
3658 {
3659         osc_stop_grant_work();
3660         remove_shrinker(osc_cache_shrinker);
3661         class_unregister_type(LUSTRE_OSC_NAME);
3662         lu_kmem_fini(osc_caches);
3663         ptlrpc_free_rq_pool(osc_rq_pool);
3664 }
3665
3666 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3667 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3668 MODULE_VERSION(LUSTRE_VERSION_STRING);
3669 MODULE_LICENSE("GPL");
3670
3671 module_init(osc_init);
3672 module_exit(osc_exit);