Whamcloud - gitweb
00cc4411ca9a33549d620d6a0d8eb0dc8ede1906
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 ptlrpc_set_add_req(rqset, req);
240         }
241
242         RETURN(0);
243 }
244
245 static int osc_ladvise_interpret(const struct lu_env *env,
246                                  struct ptlrpc_request *req,
247                                  void *arg, int rc)
248 {
249         struct osc_ladvise_args *la = arg;
250         struct ost_body *body;
251         ENTRY;
252
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
257         if (body == NULL)
258                 GOTO(out, rc = -EPROTO);
259
260         *la->la_oa = body->oa;
261 out:
262         rc = la->la_upcall(la->la_cookie, rc);
263         RETURN(rc);
264 }
265
266 /**
267  * If rqset is NULL, do not wait for response. Upcall and cookie could also
268  * be NULL in this case
269  */
270 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
271                      struct ladvise_hdr *ladvise_hdr,
272                      obd_enqueue_update_f upcall, void *cookie,
273                      struct ptlrpc_request_set *rqset)
274 {
275         struct ptlrpc_request   *req;
276         struct ost_body         *body;
277         struct osc_ladvise_args *la;
278         int                      rc;
279         struct lu_ladvise       *req_ladvise;
280         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
281         int                      num_advise = ladvise_hdr->lah_count;
282         struct ladvise_hdr      *req_ladvise_hdr;
283         ENTRY;
284
285         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
286         if (req == NULL)
287                 RETURN(-ENOMEM);
288
289         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
290                              num_advise * sizeof(*ladvise));
291         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
292         if (rc != 0) {
293                 ptlrpc_request_free(req);
294                 RETURN(rc);
295         }
296         req->rq_request_portal = OST_IO_PORTAL;
297         ptlrpc_at_set_req_timeout(req);
298
299         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
300         LASSERT(body);
301         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
302                              oa);
303
304         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
305                                                  &RMF_OST_LADVISE_HDR);
306         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
307
308         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
309         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
310         ptlrpc_request_set_replen(req);
311
312         if (rqset == NULL) {
313                 /* Do not wait for response. */
314                 ptlrpcd_add_req(req);
315                 RETURN(0);
316         }
317
318         req->rq_interpret_reply = osc_ladvise_interpret;
319         la = ptlrpc_req_async_args(la, req);
320         la->la_oa = oa;
321         la->la_upcall = upcall;
322         la->la_cookie = cookie;
323
324         ptlrpc_set_add_req(rqset, req);
325
326         RETURN(0);
327 }
328
329 static int osc_create(const struct lu_env *env, struct obd_export *exp,
330                       struct obdo *oa)
331 {
332         struct ptlrpc_request *req;
333         struct ost_body       *body;
334         int                    rc;
335         ENTRY;
336
337         LASSERT(oa != NULL);
338         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
339         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
340
341         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
342         if (req == NULL)
343                 GOTO(out, rc = -ENOMEM);
344
345         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
346         if (rc) {
347                 ptlrpc_request_free(req);
348                 GOTO(out, rc);
349         }
350
351         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
352         LASSERT(body);
353
354         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
355
356         ptlrpc_request_set_replen(req);
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         CDEBUG(D_HA, "transno: %lld\n",
373                lustre_msg_get_transno(req->rq_repmsg));
374 out_req:
375         ptlrpc_req_finished(req);
376 out:
377         RETURN(rc);
378 }
379
380 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
381                    obd_enqueue_update_f upcall, void *cookie)
382 {
383         struct ptlrpc_request *req;
384         struct osc_setattr_args *sa;
385         struct obd_import *imp = class_exp2cliimp(exp);
386         struct ost_body *body;
387         int rc;
388
389         ENTRY;
390
391         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
396         if (rc < 0) {
397                 ptlrpc_request_free(req);
398                 RETURN(rc);
399         }
400
401         osc_set_io_portal(req);
402
403         ptlrpc_at_set_req_timeout(req);
404
405         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
406
407         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
408
409         ptlrpc_request_set_replen(req);
410
411         req->rq_interpret_reply = osc_setattr_interpret;
412         sa = ptlrpc_req_async_args(sa, req);
413         sa->sa_oa = oa;
414         sa->sa_upcall = upcall;
415         sa->sa_cookie = cookie;
416
417         ptlrpcd_add_req(req);
418
419         RETURN(0);
420 }
421 EXPORT_SYMBOL(osc_punch_send);
422
423 static int osc_sync_interpret(const struct lu_env *env,
424                               struct ptlrpc_request *req, void *args, int rc)
425 {
426         struct osc_fsync_args *fa = args;
427         struct ost_body *body;
428         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
429         unsigned long valid = 0;
430         struct cl_object *obj;
431         ENTRY;
432
433         if (rc != 0)
434                 GOTO(out, rc);
435
436         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
437         if (body == NULL) {
438                 CERROR("can't unpack ost_body\n");
439                 GOTO(out, rc = -EPROTO);
440         }
441
442         *fa->fa_oa = body->oa;
443         obj = osc2cl(fa->fa_obj);
444
445         /* Update osc object's blocks attribute */
446         cl_object_attr_lock(obj);
447         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
448                 attr->cat_blocks = body->oa.o_blocks;
449                 valid |= CAT_BLOCKS;
450         }
451
452         if (valid != 0)
453                 cl_object_attr_update(env, obj, attr, valid);
454         cl_object_attr_unlock(obj);
455
456 out:
457         rc = fa->fa_upcall(fa->fa_cookie, rc);
458         RETURN(rc);
459 }
460
461 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
462                   obd_enqueue_update_f upcall, void *cookie,
463                   struct ptlrpc_request_set *rqset)
464 {
465         struct obd_export     *exp = osc_export(obj);
466         struct ptlrpc_request *req;
467         struct ost_body       *body;
468         struct osc_fsync_args *fa;
469         int                    rc;
470         ENTRY;
471
472         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
473         if (req == NULL)
474                 RETURN(-ENOMEM);
475
476         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 RETURN(rc);
480         }
481
482         /* overload the size and blocks fields in the oa with start/end */
483         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
484         LASSERT(body);
485         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
486
487         ptlrpc_request_set_replen(req);
488         req->rq_interpret_reply = osc_sync_interpret;
489
490         fa = ptlrpc_req_async_args(fa, req);
491         fa->fa_obj = obj;
492         fa->fa_oa = oa;
493         fa->fa_upcall = upcall;
494         fa->fa_cookie = cookie;
495
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN (0);
499 }
500
501 /* Find and cancel locally locks matched by @mode in the resource found by
502  * @objid. Found locks are added into @cancel list. Returns the amount of
503  * locks added to @cancels list. */
504 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
505                                    struct list_head *cancels,
506                                    enum ldlm_mode mode, __u64 lock_flags)
507 {
508         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
509         struct ldlm_res_id res_id;
510         struct ldlm_resource *res;
511         int count;
512         ENTRY;
513
514         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
515          * export) but disabled through procfs (flag in NS).
516          *
517          * This distinguishes from a case when ELC is not supported originally,
518          * when we still want to cancel locks in advance and just cancel them
519          * locally, without sending any RPC. */
520         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
521                 RETURN(0);
522
523         ostid_build_res_name(&oa->o_oi, &res_id);
524         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
525         if (IS_ERR(res))
526                 RETURN(0);
527
528         LDLM_RESOURCE_ADDREF(res);
529         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
530                                            lock_flags, 0, NULL);
531         LDLM_RESOURCE_DELREF(res);
532         ldlm_resource_putref(res);
533         RETURN(count);
534 }
535
536 static int osc_destroy_interpret(const struct lu_env *env,
537                                  struct ptlrpc_request *req, void *args, int rc)
538 {
539         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
540
541         atomic_dec(&cli->cl_destroy_in_flight);
542         wake_up(&cli->cl_destroy_waitq);
543
544         return 0;
545 }
546
547 static int osc_can_send_destroy(struct client_obd *cli)
548 {
549         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
550             cli->cl_max_rpcs_in_flight) {
551                 /* The destroy request can be sent */
552                 return 1;
553         }
554         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
555             cli->cl_max_rpcs_in_flight) {
556                 /*
557                  * The counter has been modified between the two atomic
558                  * operations.
559                  */
560                 wake_up(&cli->cl_destroy_waitq);
561         }
562         return 0;
563 }
564
565 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
566                        struct obdo *oa)
567 {
568         struct client_obd     *cli = &exp->exp_obd->u.cli;
569         struct ptlrpc_request *req;
570         struct ost_body       *body;
571         LIST_HEAD(cancels);
572         int rc, count;
573         ENTRY;
574
575         if (!oa) {
576                 CDEBUG(D_INFO, "oa NULL\n");
577                 RETURN(-EINVAL);
578         }
579
580         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
581                                         LDLM_FL_DISCARD_DATA);
582
583         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
584         if (req == NULL) {
585                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
586                 RETURN(-ENOMEM);
587         }
588
589         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
590                                0, &cancels, count);
591         if (rc) {
592                 ptlrpc_request_free(req);
593                 RETURN(rc);
594         }
595
596         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
597         ptlrpc_at_set_req_timeout(req);
598
599         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
600         LASSERT(body);
601         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
602
603         ptlrpc_request_set_replen(req);
604
605         req->rq_interpret_reply = osc_destroy_interpret;
606         if (!osc_can_send_destroy(cli)) {
607                 /*
608                  * Wait until the number of on-going destroy RPCs drops
609                  * under max_rpc_in_flight
610                  */
611                 rc = l_wait_event_abortable_exclusive(
612                         cli->cl_destroy_waitq,
613                         osc_can_send_destroy(cli));
614                 if (rc) {
615                         ptlrpc_req_finished(req);
616                         RETURN(-EINTR);
617                 }
618         }
619
620         /* Do not wait for response */
621         ptlrpcd_add_req(req);
622         RETURN(0);
623 }
624
625 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
626                                 long writing_bytes)
627 {
628         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
629
630         LASSERT(!(oa->o_valid & bits));
631
632         oa->o_valid |= bits;
633         spin_lock(&cli->cl_loi_list_lock);
634         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
635                 oa->o_dirty = cli->cl_dirty_grant;
636         else
637                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
638         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty_pages,
641                        cli->cl_dirty_max_pages);
642                 oa->o_undirty = 0;
643         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
644                             (long)(obd_max_dirty_pages + 1))) {
645                 /* The atomic_read() allowing the atomic_inc() are
646                  * not covered by a lock thus they may safely race and trip
647                  * this CERROR() unless we add in a small fudge factor (+1). */
648                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
649                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
650                        obd_max_dirty_pages);
651                 oa->o_undirty = 0;
652         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
653                             0x7fffffff)) {
654                 CERROR("dirty %lu - dirty_max %lu too big???\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
656                 oa->o_undirty = 0;
657         } else {
658                 unsigned long nrpages;
659                 unsigned long undirty;
660
661                 nrpages = cli->cl_max_pages_per_rpc;
662                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
663                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
664                 undirty = nrpages << PAGE_SHIFT;
665                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
666                                  GRANT_PARAM)) {
667                         int nrextents;
668
669                         /* take extent tax into account when asking for more
670                          * grant space */
671                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
672                                      cli->cl_max_extent_pages;
673                         undirty += nrextents * cli->cl_grant_extent_tax;
674                 }
675                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
676                  * to add extent tax, etc.
677                  */
678                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
679                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
680         }
681         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         spin_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 void osc_update_next_shrink(struct client_obd *cli)
690 {
691         cli->cl_next_shrink_grant = ktime_get_seconds() +
692                                     cli->cl_grant_shrink_interval;
693
694         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
695                cli->cl_next_shrink_grant);
696 }
697
698 static void __osc_update_grant(struct client_obd *cli, u64 grant)
699 {
700         spin_lock(&cli->cl_loi_list_lock);
701         cli->cl_avail_grant += grant;
702         spin_unlock(&cli->cl_loi_list_lock);
703 }
704
705 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
706 {
707         if (body->oa.o_valid & OBD_MD_FLGRANT) {
708                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
709                 __osc_update_grant(cli, body->oa.o_grant);
710         }
711 }
712
713 /**
714  * grant thread data for shrinking space.
715  */
716 struct grant_thread_data {
717         struct list_head        gtd_clients;
718         struct mutex            gtd_mutex;
719         unsigned long           gtd_stopped:1;
720 };
721 static struct grant_thread_data client_gtd;
722
723 static int osc_shrink_grant_interpret(const struct lu_env *env,
724                                       struct ptlrpc_request *req,
725                                       void *args, int rc)
726 {
727         struct osc_grant_args *aa = args;
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct ost_body *body;
730
731         if (rc != 0) {
732                 __osc_update_grant(cli, aa->aa_oa->o_grant);
733                 GOTO(out, rc);
734         }
735
736         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
737         LASSERT(body);
738         osc_update_grant(cli, body);
739 out:
740         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
741         aa->aa_oa = NULL;
742
743         return rc;
744 }
745
746 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
747 {
748         spin_lock(&cli->cl_loi_list_lock);
749         oa->o_grant = cli->cl_avail_grant / 4;
750         cli->cl_avail_grant -= oa->o_grant;
751         spin_unlock(&cli->cl_loi_list_lock);
752         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
753                 oa->o_valid |= OBD_MD_FLFLAGS;
754                 oa->o_flags = 0;
755         }
756         oa->o_flags |= OBD_FL_SHRINK_GRANT;
757         osc_update_next_shrink(cli);
758 }
759
760 /* Shrink the current grant, either from some large amount to enough for a
761  * full set of in-flight RPCs, or if we have already shrunk to that limit
762  * then to enough for a single RPC.  This avoids keeping more grant than
763  * needed, and avoids shrinking the grant piecemeal. */
764 static int osc_shrink_grant(struct client_obd *cli)
765 {
766         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
767                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
768
769         spin_lock(&cli->cl_loi_list_lock);
770         if (cli->cl_avail_grant <= target_bytes)
771                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
772         spin_unlock(&cli->cl_loi_list_lock);
773
774         return osc_shrink_grant_to_target(cli, target_bytes);
775 }
776
777 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
778 {
779         int                     rc = 0;
780         struct ost_body        *body;
781         ENTRY;
782
783         spin_lock(&cli->cl_loi_list_lock);
784         /* Don't shrink if we are already above or below the desired limit
785          * We don't want to shrink below a single RPC, as that will negatively
786          * impact block allocation and long-term performance. */
787         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
788                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
789
790         if (target_bytes >= cli->cl_avail_grant) {
791                 spin_unlock(&cli->cl_loi_list_lock);
792                 RETURN(0);
793         }
794         spin_unlock(&cli->cl_loi_list_lock);
795
796         OBD_ALLOC_PTR(body);
797         if (!body)
798                 RETURN(-ENOMEM);
799
800         osc_announce_cached(cli, &body->oa, 0);
801
802         spin_lock(&cli->cl_loi_list_lock);
803         if (target_bytes >= cli->cl_avail_grant) {
804                 /* available grant has changed since target calculation */
805                 spin_unlock(&cli->cl_loi_list_lock);
806                 GOTO(out_free, rc = 0);
807         }
808         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
809         cli->cl_avail_grant = target_bytes;
810         spin_unlock(&cli->cl_loi_list_lock);
811         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
812                 body->oa.o_valid |= OBD_MD_FLFLAGS;
813                 body->oa.o_flags = 0;
814         }
815         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
816         osc_update_next_shrink(cli);
817
818         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
819                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
820                                 sizeof(*body), body, NULL);
821         if (rc != 0)
822                 __osc_update_grant(cli, body->oa.o_grant);
823 out_free:
824         OBD_FREE_PTR(body);
825         RETURN(rc);
826 }
827
828 static int osc_should_shrink_grant(struct client_obd *client)
829 {
830         time64_t next_shrink = client->cl_next_shrink_grant;
831
832         if (client->cl_import == NULL)
833                 return 0;
834
835         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
836             client->cl_import->imp_grant_shrink_disabled) {
837                 osc_update_next_shrink(client);
838                 return 0;
839         }
840
841         if (ktime_get_seconds() >= next_shrink - 5) {
842                 /* Get the current RPC size directly, instead of going via:
843                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844                  * Keep comment here so that it can be found by searching. */
845                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
846
847                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848                     client->cl_avail_grant > brw_size)
849                         return 1;
850                 else
851                         osc_update_next_shrink(client);
852         }
853         return 0;
854 }
855
856 #define GRANT_SHRINK_RPC_BATCH  100
857
858 static struct delayed_work work;
859
860 static void osc_grant_work_handler(struct work_struct *data)
861 {
862         struct client_obd *cli;
863         int rpc_sent;
864         bool init_next_shrink = true;
865         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
866
867         rpc_sent = 0;
868         mutex_lock(&client_gtd.gtd_mutex);
869         list_for_each_entry(cli, &client_gtd.gtd_clients,
870                             cl_grant_chain) {
871                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
872                     osc_should_shrink_grant(cli)) {
873                         osc_shrink_grant(cli);
874                         rpc_sent++;
875                 }
876
877                 if (!init_next_shrink) {
878                         if (cli->cl_next_shrink_grant < next_shrink &&
879                             cli->cl_next_shrink_grant > ktime_get_seconds())
880                                 next_shrink = cli->cl_next_shrink_grant;
881                 } else {
882                         init_next_shrink = false;
883                         next_shrink = cli->cl_next_shrink_grant;
884                 }
885         }
886         mutex_unlock(&client_gtd.gtd_mutex);
887
888         if (client_gtd.gtd_stopped == 1)
889                 return;
890
891         if (next_shrink > ktime_get_seconds()) {
892                 time64_t delay = next_shrink - ktime_get_seconds();
893
894                 schedule_delayed_work(&work, cfs_time_seconds(delay));
895         } else {
896                 schedule_work(&work.work);
897         }
898 }
899
900 void osc_schedule_grant_work(void)
901 {
902         cancel_delayed_work_sync(&work);
903         schedule_work(&work.work);
904 }
905
906 /**
907  * Start grant thread for returing grant to server for idle clients.
908  */
909 static int osc_start_grant_work(void)
910 {
911         client_gtd.gtd_stopped = 0;
912         mutex_init(&client_gtd.gtd_mutex);
913         INIT_LIST_HEAD(&client_gtd.gtd_clients);
914
915         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
916         schedule_work(&work.work);
917
918         return 0;
919 }
920
921 static void osc_stop_grant_work(void)
922 {
923         client_gtd.gtd_stopped = 1;
924         cancel_delayed_work_sync(&work);
925 }
926
927 static void osc_add_grant_list(struct client_obd *client)
928 {
929         mutex_lock(&client_gtd.gtd_mutex);
930         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
931         mutex_unlock(&client_gtd.gtd_mutex);
932 }
933
934 static void osc_del_grant_list(struct client_obd *client)
935 {
936         if (list_empty(&client->cl_grant_chain))
937                 return;
938
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_del_init(&client->cl_grant_chain);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
945 {
946         /*
947          * ocd_grant is the total grant amount we're expect to hold: if we've
948          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
949          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
950          * dirty.
951          *
952          * race is tolerable here: if we're evicted, but imp_state already
953          * left EVICTED state, then cl_dirty_pages must be 0 already.
954          */
955         spin_lock(&cli->cl_loi_list_lock);
956         cli->cl_avail_grant = ocd->ocd_grant;
957         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
958                 cli->cl_avail_grant -= cli->cl_reserved_grant;
959                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
960                         cli->cl_avail_grant -= cli->cl_dirty_grant;
961                 else
962                         cli->cl_avail_grant -=
963                                         cli->cl_dirty_pages << PAGE_SHIFT;
964         }
965
966         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
967                 u64 size;
968                 int chunk_mask;
969
970                 /* overhead for each extent insertion */
971                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
972                 /* determine the appropriate chunk size used by osc_extent. */
973                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
974                                           ocd->ocd_grant_blkbits);
975                 /* max_pages_per_rpc must be chunk aligned */
976                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
977                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
978                                              ~chunk_mask) & chunk_mask;
979                 /* determine maximum extent size, in #pages */
980                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
981                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
982                 if (cli->cl_max_extent_pages == 0)
983                         cli->cl_max_extent_pages = 1;
984         } else {
985                 cli->cl_grant_extent_tax = 0;
986                 cli->cl_chunkbits = PAGE_SHIFT;
987                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
988         }
989         spin_unlock(&cli->cl_loi_list_lock);
990
991         CDEBUG(D_CACHE,
992                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
993                cli_name(cli),
994                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
995                cli->cl_max_extent_pages);
996
997         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
998                 osc_add_grant_list(cli);
999 }
1000 EXPORT_SYMBOL(osc_init_grant);
1001
1002 /* We assume that the reason this OSC got a short read is because it read
1003  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1004  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1005  * this stripe never got written at or beyond this stripe offset yet. */
1006 static void handle_short_read(int nob_read, size_t page_count,
1007                               struct brw_page **pga)
1008 {
1009         char *ptr;
1010         int i = 0;
1011
1012         /* skip bytes read OK */
1013         while (nob_read > 0) {
1014                 LASSERT (page_count > 0);
1015
1016                 if (pga[i]->count > nob_read) {
1017                         /* EOF inside this page */
1018                         ptr = kmap(pga[i]->pg) +
1019                                 (pga[i]->off & ~PAGE_MASK);
1020                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1021                         kunmap(pga[i]->pg);
1022                         page_count--;
1023                         i++;
1024                         break;
1025                 }
1026
1027                 nob_read -= pga[i]->count;
1028                 page_count--;
1029                 i++;
1030         }
1031
1032         /* zero remaining pages */
1033         while (page_count-- > 0) {
1034                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1035                 memset(ptr, 0, pga[i]->count);
1036                 kunmap(pga[i]->pg);
1037                 i++;
1038         }
1039 }
1040
1041 static int check_write_rcs(struct ptlrpc_request *req,
1042                            int requested_nob, int niocount,
1043                            size_t page_count, struct brw_page **pga)
1044 {
1045         int     i;
1046         __u32   *remote_rcs;
1047
1048         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1049                                                   sizeof(*remote_rcs) *
1050                                                   niocount);
1051         if (remote_rcs == NULL) {
1052                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1053                 return(-EPROTO);
1054         }
1055
1056         /* return error if any niobuf was in error */
1057         for (i = 0; i < niocount; i++) {
1058                 if ((int)remote_rcs[i] < 0) {
1059                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1060                                i, remote_rcs[i], req);
1061                         return remote_rcs[i];
1062                 }
1063
1064                 if (remote_rcs[i] != 0) {
1065                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1066                                 i, remote_rcs[i], req);
1067                         return(-EPROTO);
1068                 }
1069         }
1070         if (req->rq_bulk != NULL &&
1071             req->rq_bulk->bd_nob_transferred != requested_nob) {
1072                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1073                        req->rq_bulk->bd_nob_transferred, requested_nob);
1074                 return(-EPROTO);
1075         }
1076
1077         return (0);
1078 }
1079
1080 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1081 {
1082         if (p1->flag != p2->flag) {
1083                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1084                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1085                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1086
1087                 /* warn if we try to combine flags that we don't know to be
1088                  * safe to combine */
1089                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1090                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1091                               "report this at https://jira.whamcloud.com/\n",
1092                               p1->flag, p2->flag);
1093                 }
1094                 return 0;
1095         }
1096
1097         return (p1->off + p1->count == p2->off);
1098 }
1099
1100 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1101 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1102                                    size_t pg_count, struct brw_page **pga,
1103                                    int opc, obd_dif_csum_fn *fn,
1104                                    int sector_size,
1105                                    u32 *check_sum)
1106 {
1107         struct ahash_request *req;
1108         /* Used Adler as the default checksum type on top of DIF tags */
1109         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1110         struct page *__page;
1111         unsigned char *buffer;
1112         __u16 *guard_start;
1113         unsigned int bufsize;
1114         int guard_number;
1115         int used_number = 0;
1116         int used;
1117         u32 cksum;
1118         int rc = 0;
1119         int i = 0;
1120
1121         LASSERT(pg_count > 0);
1122
1123         __page = alloc_page(GFP_KERNEL);
1124         if (__page == NULL)
1125                 return -ENOMEM;
1126
1127         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1128         if (IS_ERR(req)) {
1129                 rc = PTR_ERR(req);
1130                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1131                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1132                 GOTO(out, rc);
1133         }
1134
1135         buffer = kmap(__page);
1136         guard_start = (__u16 *)buffer;
1137         guard_number = PAGE_SIZE / sizeof(*guard_start);
1138         while (nob > 0 && pg_count > 0) {
1139                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1140
1141                 /* corrupt the data before we compute the checksum, to
1142                  * simulate an OST->client data error */
1143                 if (unlikely(i == 0 && opc == OST_READ &&
1144                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1145                         unsigned char *ptr = kmap(pga[i]->pg);
1146                         int off = pga[i]->off & ~PAGE_MASK;
1147
1148                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1149                         kunmap(pga[i]->pg);
1150                 }
1151
1152                 /*
1153                  * The left guard number should be able to hold checksums of a
1154                  * whole page
1155                  */
1156                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1157                                                   pga[i]->off & ~PAGE_MASK,
1158                                                   count,
1159                                                   guard_start + used_number,
1160                                                   guard_number - used_number,
1161                                                   &used, sector_size,
1162                                                   fn);
1163                 if (rc)
1164                         break;
1165
1166                 used_number += used;
1167                 if (used_number == guard_number) {
1168                         cfs_crypto_hash_update_page(req, __page, 0,
1169                                 used_number * sizeof(*guard_start));
1170                         used_number = 0;
1171                 }
1172
1173                 nob -= pga[i]->count;
1174                 pg_count--;
1175                 i++;
1176         }
1177         kunmap(__page);
1178         if (rc)
1179                 GOTO(out, rc);
1180
1181         if (used_number != 0)
1182                 cfs_crypto_hash_update_page(req, __page, 0,
1183                         used_number * sizeof(*guard_start));
1184
1185         bufsize = sizeof(cksum);
1186         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1187
1188         /* For sending we only compute the wrong checksum instead
1189          * of corrupting the data so it is still correct on a redo */
1190         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1191                 cksum++;
1192
1193         *check_sum = cksum;
1194 out:
1195         __free_page(__page);
1196         return rc;
1197 }
1198 #else /* !CONFIG_CRC_T10DIF */
1199 #define obd_dif_ip_fn NULL
1200 #define obd_dif_crc_fn NULL
1201 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1202         -EOPNOTSUPP
1203 #endif /* CONFIG_CRC_T10DIF */
1204
1205 static int osc_checksum_bulk(int nob, size_t pg_count,
1206                              struct brw_page **pga, int opc,
1207                              enum cksum_types cksum_type,
1208                              u32 *cksum)
1209 {
1210         int                             i = 0;
1211         struct ahash_request           *req;
1212         unsigned int                    bufsize;
1213         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1214
1215         LASSERT(pg_count > 0);
1216
1217         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1218         if (IS_ERR(req)) {
1219                 CERROR("Unable to initialize checksum hash %s\n",
1220                        cfs_crypto_hash_name(cfs_alg));
1221                 return PTR_ERR(req);
1222         }
1223
1224         while (nob > 0 && pg_count > 0) {
1225                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1226
1227                 /* corrupt the data before we compute the checksum, to
1228                  * simulate an OST->client data error */
1229                 if (i == 0 && opc == OST_READ &&
1230                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1231                         unsigned char *ptr = kmap(pga[i]->pg);
1232                         int off = pga[i]->off & ~PAGE_MASK;
1233
1234                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1235                         kunmap(pga[i]->pg);
1236                 }
1237                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1238                                             pga[i]->off & ~PAGE_MASK,
1239                                             count);
1240                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1241                                (int)(pga[i]->off & ~PAGE_MASK));
1242
1243                 nob -= pga[i]->count;
1244                 pg_count--;
1245                 i++;
1246         }
1247
1248         bufsize = sizeof(*cksum);
1249         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1250
1251         /* For sending we only compute the wrong checksum instead
1252          * of corrupting the data so it is still correct on a redo */
1253         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1254                 (*cksum)++;
1255
1256         return 0;
1257 }
1258
1259 static int osc_checksum_bulk_rw(const char *obd_name,
1260                                 enum cksum_types cksum_type,
1261                                 int nob, size_t pg_count,
1262                                 struct brw_page **pga, int opc,
1263                                 u32 *check_sum)
1264 {
1265         obd_dif_csum_fn *fn = NULL;
1266         int sector_size = 0;
1267         int rc;
1268
1269         ENTRY;
1270         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1271
1272         if (fn)
1273                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1274                                              opc, fn, sector_size, check_sum);
1275         else
1276                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1277                                        check_sum);
1278
1279         RETURN(rc);
1280 }
1281
1282 static int
1283 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1284                      u32 page_count, struct brw_page **pga,
1285                      struct ptlrpc_request **reqp, int resend)
1286 {
1287         struct ptlrpc_request   *req;
1288         struct ptlrpc_bulk_desc *desc;
1289         struct ost_body         *body;
1290         struct obd_ioobj        *ioobj;
1291         struct niobuf_remote    *niobuf;
1292         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1293         struct osc_brw_async_args *aa;
1294         struct req_capsule      *pill;
1295         struct brw_page *pg_prev;
1296         void *short_io_buf;
1297         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1298
1299         ENTRY;
1300         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301                 RETURN(-ENOMEM); /* Recoverable */
1302         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303                 RETURN(-EINVAL); /* Fatal */
1304
1305         if ((cmd & OBD_BRW_WRITE) != 0) {
1306                 opc = OST_WRITE;
1307                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1308                                                 osc_rq_pool,
1309                                                 &RQF_OST_BRW_WRITE);
1310         } else {
1311                 opc = OST_READ;
1312                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1313         }
1314         if (req == NULL)
1315                 RETURN(-ENOMEM);
1316
1317         for (niocount = i = 1; i < page_count; i++) {
1318                 if (!can_merge_pages(pga[i - 1], pga[i]))
1319                         niocount++;
1320         }
1321
1322         pill = &req->rq_pill;
1323         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1324                              sizeof(*ioobj));
1325         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326                              niocount * sizeof(*niobuf));
1327
1328         for (i = 0; i < page_count; i++)
1329                 short_io_size += pga[i]->count;
1330
1331         /* Check if read/write is small enough to be a short io. */
1332         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1333             !imp_connect_shortio(cli->cl_import))
1334                 short_io_size = 0;
1335
1336         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1337                              opc == OST_READ ? 0 : short_io_size);
1338         if (opc == OST_READ)
1339                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1340                                      short_io_size);
1341
1342         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1343         if (rc) {
1344                 ptlrpc_request_free(req);
1345                 RETURN(rc);
1346         }
1347         osc_set_io_portal(req);
1348
1349         ptlrpc_at_set_req_timeout(req);
1350         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1351          * retry logic */
1352         req->rq_no_retry_einprogress = 1;
1353
1354         if (short_io_size != 0) {
1355                 desc = NULL;
1356                 short_io_buf = NULL;
1357                 goto no_bulk;
1358         }
1359
1360         desc = ptlrpc_prep_bulk_imp(req, page_count,
1361                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1362                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1363                         PTLRPC_BULK_PUT_SINK),
1364                 OST_BULK_PORTAL,
1365                 &ptlrpc_bulk_kiov_pin_ops);
1366
1367         if (desc == NULL)
1368                 GOTO(out, rc = -ENOMEM);
1369         /* NB request now owns desc and will free it when it gets freed */
1370 no_bulk:
1371         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1372         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1373         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1374         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1375
1376         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1377
1378         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1379          * and from_kgid(), because they are asynchronous. Fortunately, variable
1380          * oa contains valid o_uid and o_gid in these two operations.
1381          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1382          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1383          * other process logic */
1384         body->oa.o_uid = oa->o_uid;
1385         body->oa.o_gid = oa->o_gid;
1386
1387         obdo_to_ioobj(oa, ioobj);
1388         ioobj->ioo_bufcnt = niocount;
1389         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1390          * that might be send for this request.  The actual number is decided
1391          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1392          * "max - 1" for old client compatibility sending "0", and also so the
1393          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1394         if (desc != NULL)
1395                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1396         else /* short io */
1397                 ioobj_max_brw_set(ioobj, 0);
1398
1399         if (short_io_size != 0) {
1400                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1402                         body->oa.o_flags = 0;
1403                 }
1404                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1405                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1406                        short_io_size);
1407                 if (opc == OST_WRITE) {
1408                         short_io_buf = req_capsule_client_get(pill,
1409                                                               &RMF_SHORT_IO);
1410                         LASSERT(short_io_buf != NULL);
1411                 }
1412         }
1413
1414         LASSERT(page_count > 0);
1415         pg_prev = pga[0];
1416         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1417                 struct brw_page *pg = pga[i];
1418                 int poff = pg->off & ~PAGE_MASK;
1419
1420                 LASSERT(pg->count > 0);
1421                 /* make sure there is no gap in the middle of page array */
1422                 LASSERTF(page_count == 1 ||
1423                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1424                           ergo(i > 0 && i < page_count - 1,
1425                                poff == 0 && pg->count == PAGE_SIZE)   &&
1426                           ergo(i == page_count - 1, poff == 0)),
1427                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1428                          i, page_count, pg, pg->off, pg->count);
1429                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1430                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1431                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1432                          i, page_count,
1433                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1434                          pg_prev->pg, page_private(pg_prev->pg),
1435                          pg_prev->pg->index, pg_prev->off);
1436                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1437                         (pg->flag & OBD_BRW_SRVLOCK));
1438                 if (short_io_size != 0 && opc == OST_WRITE) {
1439                         unsigned char *ptr = kmap_atomic(pg->pg);
1440
1441                         LASSERT(short_io_size >= requested_nob + pg->count);
1442                         memcpy(short_io_buf + requested_nob,
1443                                ptr + poff,
1444                                pg->count);
1445                         kunmap_atomic(ptr);
1446                 } else if (short_io_size == 0) {
1447                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1448                                                          pg->count);
1449                 }
1450                 requested_nob += pg->count;
1451
1452                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1453                         niobuf--;
1454                         niobuf->rnb_len += pg->count;
1455                 } else {
1456                         niobuf->rnb_offset = pg->off;
1457                         niobuf->rnb_len    = pg->count;
1458                         niobuf->rnb_flags  = pg->flag;
1459                 }
1460                 pg_prev = pg;
1461         }
1462
1463         LASSERTF((void *)(niobuf - niocount) ==
1464                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1465                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1466                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1467
1468         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1469         if (resend) {
1470                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1471                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1472                         body->oa.o_flags = 0;
1473                 }
1474                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1475         }
1476
1477         if (osc_should_shrink_grant(cli))
1478                 osc_shrink_grant_local(cli, &body->oa);
1479
1480         /* size[REQ_REC_OFF] still sizeof (*body) */
1481         if (opc == OST_WRITE) {
1482                 if (cli->cl_checksum &&
1483                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1484                         /* store cl_cksum_type in a local variable since
1485                          * it can be changed via lprocfs */
1486                         enum cksum_types cksum_type = cli->cl_cksum_type;
1487
1488                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1489                                 body->oa.o_flags = 0;
1490
1491                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1492                                                                 cksum_type);
1493                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1494
1495                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1496                                                   requested_nob, page_count,
1497                                                   pga, OST_WRITE,
1498                                                   &body->oa.o_cksum);
1499                         if (rc < 0) {
1500                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1501                                        rc);
1502                                 GOTO(out, rc);
1503                         }
1504                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1505                                body->oa.o_cksum);
1506
1507                         /* save this in 'oa', too, for later checking */
1508                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1509                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1510                                                            cksum_type);
1511                 } else {
1512                         /* clear out the checksum flag, in case this is a
1513                          * resend but cl_checksum is no longer set. b=11238 */
1514                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1515                 }
1516                 oa->o_cksum = body->oa.o_cksum;
1517                 /* 1 RC per niobuf */
1518                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1519                                      sizeof(__u32) * niocount);
1520         } else {
1521                 if (cli->cl_checksum &&
1522                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1523                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1524                                 body->oa.o_flags = 0;
1525                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1526                                 cli->cl_cksum_type);
1527                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1528                 }
1529
1530                 /* Client cksum has been already copied to wire obdo in previous
1531                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1532                  * resent due to cksum error, this will allow Server to
1533                  * check+dump pages on its side */
1534         }
1535         ptlrpc_request_set_replen(req);
1536
1537         aa = ptlrpc_req_async_args(aa, req);
1538         aa->aa_oa = oa;
1539         aa->aa_requested_nob = requested_nob;
1540         aa->aa_nio_count = niocount;
1541         aa->aa_page_count = page_count;
1542         aa->aa_resends = 0;
1543         aa->aa_ppga = pga;
1544         aa->aa_cli = cli;
1545         INIT_LIST_HEAD(&aa->aa_oaps);
1546
1547         *reqp = req;
1548         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1549         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1550                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1551                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1552         RETURN(0);
1553
1554  out:
1555         ptlrpc_req_finished(req);
1556         RETURN(rc);
1557 }
1558
1559 char dbgcksum_file_name[PATH_MAX];
1560
1561 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1562                                 struct brw_page **pga, __u32 server_cksum,
1563                                 __u32 client_cksum)
1564 {
1565         struct file *filp;
1566         int rc, i;
1567         unsigned int len;
1568         char *buf;
1569
1570         /* will only keep dump of pages on first error for the same range in
1571          * file/fid, not during the resends/retries. */
1572         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1573                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1574                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1575                   libcfs_debug_file_path_arr :
1576                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1577                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1578                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1579                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1580                  pga[0]->off,
1581                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1582                  client_cksum, server_cksum);
1583         filp = filp_open(dbgcksum_file_name,
1584                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1585         if (IS_ERR(filp)) {
1586                 rc = PTR_ERR(filp);
1587                 if (rc == -EEXIST)
1588                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1589                                "checksum error: rc = %d\n", dbgcksum_file_name,
1590                                rc);
1591                 else
1592                         CERROR("%s: can't open to dump pages with checksum "
1593                                "error: rc = %d\n", dbgcksum_file_name, rc);
1594                 return;
1595         }
1596
1597         for (i = 0; i < page_count; i++) {
1598                 len = pga[i]->count;
1599                 buf = kmap(pga[i]->pg);
1600                 while (len != 0) {
1601                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1602                         if (rc < 0) {
1603                                 CERROR("%s: wanted to write %u but got %d "
1604                                        "error\n", dbgcksum_file_name, len, rc);
1605                                 break;
1606                         }
1607                         len -= rc;
1608                         buf += rc;
1609                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1610                                dbgcksum_file_name, rc);
1611                 }
1612                 kunmap(pga[i]->pg);
1613         }
1614
1615         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1616         if (rc)
1617                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1618         filp_close(filp, NULL);
1619 }
1620
1621 static int
1622 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1623                      __u32 client_cksum, __u32 server_cksum,
1624                      struct osc_brw_async_args *aa)
1625 {
1626         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1627         enum cksum_types cksum_type;
1628         obd_dif_csum_fn *fn = NULL;
1629         int sector_size = 0;
1630         __u32 new_cksum;
1631         char *msg;
1632         int rc;
1633
1634         if (server_cksum == client_cksum) {
1635                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1636                 return 0;
1637         }
1638
1639         if (aa->aa_cli->cl_checksum_dump)
1640                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1641                                     server_cksum, client_cksum);
1642
1643         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1644                                            oa->o_flags : 0);
1645
1646         switch (cksum_type) {
1647         case OBD_CKSUM_T10IP512:
1648                 fn = obd_dif_ip_fn;
1649                 sector_size = 512;
1650                 break;
1651         case OBD_CKSUM_T10IP4K:
1652                 fn = obd_dif_ip_fn;
1653                 sector_size = 4096;
1654                 break;
1655         case OBD_CKSUM_T10CRC512:
1656                 fn = obd_dif_crc_fn;
1657                 sector_size = 512;
1658                 break;
1659         case OBD_CKSUM_T10CRC4K:
1660                 fn = obd_dif_crc_fn;
1661                 sector_size = 4096;
1662                 break;
1663         default:
1664                 break;
1665         }
1666
1667         if (fn)
1668                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1669                                              aa->aa_page_count, aa->aa_ppga,
1670                                              OST_WRITE, fn, sector_size,
1671                                              &new_cksum);
1672         else
1673                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1674                                        aa->aa_ppga, OST_WRITE, cksum_type,
1675                                        &new_cksum);
1676
1677         if (rc < 0)
1678                 msg = "failed to calculate the client write checksum";
1679         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1680                 msg = "the server did not use the checksum type specified in "
1681                       "the original request - likely a protocol problem";
1682         else if (new_cksum == server_cksum)
1683                 msg = "changed on the client after we checksummed it - "
1684                       "likely false positive due to mmap IO (bug 11742)";
1685         else if (new_cksum == client_cksum)
1686                 msg = "changed in transit before arrival at OST";
1687         else
1688                 msg = "changed in transit AND doesn't match the original - "
1689                       "likely false positive due to mmap IO (bug 11742)";
1690
1691         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1692                            DFID " object "DOSTID" extent [%llu-%llu], original "
1693                            "client csum %x (type %x), server csum %x (type %x),"
1694                            " client csum now %x\n",
1695                            obd_name, msg, libcfs_nid2str(peer->nid),
1696                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1697                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1698                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1699                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1700                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1701                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1702                            client_cksum,
1703                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1704                            server_cksum, cksum_type, new_cksum);
1705         return 1;
1706 }
1707
1708 /* Note rc enters this function as number of bytes transferred */
1709 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1710 {
1711         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1712         struct client_obd *cli = aa->aa_cli;
1713         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1714         const struct lnet_process_id *peer =
1715                 &req->rq_import->imp_connection->c_peer;
1716         struct ost_body *body;
1717         u32 client_cksum = 0;
1718
1719         ENTRY;
1720
1721         if (rc < 0 && rc != -EDQUOT) {
1722                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1723                 RETURN(rc);
1724         }
1725
1726         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1727         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1728         if (body == NULL) {
1729                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1730                 RETURN(-EPROTO);
1731         }
1732
1733         /* set/clear over quota flag for a uid/gid/projid */
1734         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1735             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1736                 unsigned qid[LL_MAXQUOTAS] = {
1737                                          body->oa.o_uid, body->oa.o_gid,
1738                                          body->oa.o_projid };
1739                 CDEBUG(D_QUOTA,
1740                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1741                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1742                        body->oa.o_valid, body->oa.o_flags);
1743                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1744                                        body->oa.o_flags);
1745         }
1746
1747         osc_update_grant(cli, body);
1748
1749         if (rc < 0)
1750                 RETURN(rc);
1751
1752         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1753                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1754
1755         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1756                 if (rc > 0) {
1757                         CERROR("%s: unexpected positive size %d\n",
1758                                obd_name, rc);
1759                         RETURN(-EPROTO);
1760                 }
1761
1762                 if (req->rq_bulk != NULL &&
1763                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1764                         RETURN(-EAGAIN);
1765
1766                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1767                     check_write_checksum(&body->oa, peer, client_cksum,
1768                                          body->oa.o_cksum, aa))
1769                         RETURN(-EAGAIN);
1770
1771                 rc = check_write_rcs(req, aa->aa_requested_nob,
1772                                      aa->aa_nio_count, aa->aa_page_count,
1773                                      aa->aa_ppga);
1774                 GOTO(out, rc);
1775         }
1776
1777         /* The rest of this function executes only for OST_READs */
1778
1779         if (req->rq_bulk == NULL) {
1780                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1781                                           RCL_SERVER);
1782                 LASSERT(rc == req->rq_status);
1783         } else {
1784                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1785                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1786         }
1787         if (rc < 0)
1788                 GOTO(out, rc = -EAGAIN);
1789
1790         if (rc > aa->aa_requested_nob) {
1791                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1792                        rc, aa->aa_requested_nob);
1793                 RETURN(-EPROTO);
1794         }
1795
1796         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1797                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1798                        rc, req->rq_bulk->bd_nob_transferred);
1799                 RETURN(-EPROTO);
1800         }
1801
1802         if (req->rq_bulk == NULL) {
1803                 /* short io */
1804                 int nob, pg_count, i = 0;
1805                 unsigned char *buf;
1806
1807                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1808                 pg_count = aa->aa_page_count;
1809                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1810                                                    rc);
1811                 nob = rc;
1812                 while (nob > 0 && pg_count > 0) {
1813                         unsigned char *ptr;
1814                         int count = aa->aa_ppga[i]->count > nob ?
1815                                     nob : aa->aa_ppga[i]->count;
1816
1817                         CDEBUG(D_CACHE, "page %p count %d\n",
1818                                aa->aa_ppga[i]->pg, count);
1819                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1820                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1821                                count);
1822                         kunmap_atomic((void *) ptr);
1823
1824                         buf += count;
1825                         nob -= count;
1826                         i++;
1827                         pg_count--;
1828                 }
1829         }
1830
1831         if (rc < aa->aa_requested_nob)
1832                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1833
1834         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1835                 static int cksum_counter;
1836                 u32        server_cksum = body->oa.o_cksum;
1837                 char      *via = "";
1838                 char      *router = "";
1839                 enum cksum_types cksum_type;
1840                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1841                         body->oa.o_flags : 0;
1842
1843                 cksum_type = obd_cksum_type_unpack(o_flags);
1844                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1845                                           aa->aa_page_count, aa->aa_ppga,
1846                                           OST_READ, &client_cksum);
1847                 if (rc < 0)
1848                         GOTO(out, rc);
1849
1850                 if (req->rq_bulk != NULL &&
1851                     peer->nid != req->rq_bulk->bd_sender) {
1852                         via = " via ";
1853                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1854                 }
1855
1856                 if (server_cksum != client_cksum) {
1857                         struct ost_body *clbody;
1858                         u32 page_count = aa->aa_page_count;
1859
1860                         clbody = req_capsule_client_get(&req->rq_pill,
1861                                                         &RMF_OST_BODY);
1862                         if (cli->cl_checksum_dump)
1863                                 dump_all_bulk_pages(&clbody->oa, page_count,
1864                                                     aa->aa_ppga, server_cksum,
1865                                                     client_cksum);
1866
1867                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1868                                            "%s%s%s inode "DFID" object "DOSTID
1869                                            " extent [%llu-%llu], client %x, "
1870                                            "server %x, cksum_type %x\n",
1871                                            obd_name,
1872                                            libcfs_nid2str(peer->nid),
1873                                            via, router,
1874                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1875                                                 clbody->oa.o_parent_seq : 0ULL,
1876                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1877                                                 clbody->oa.o_parent_oid : 0,
1878                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1879                                                 clbody->oa.o_parent_ver : 0,
1880                                            POSTID(&body->oa.o_oi),
1881                                            aa->aa_ppga[0]->off,
1882                                            aa->aa_ppga[page_count-1]->off +
1883                                            aa->aa_ppga[page_count-1]->count - 1,
1884                                            client_cksum, server_cksum,
1885                                            cksum_type);
1886                         cksum_counter = 0;
1887                         aa->aa_oa->o_cksum = client_cksum;
1888                         rc = -EAGAIN;
1889                 } else {
1890                         cksum_counter++;
1891                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1892                         rc = 0;
1893                 }
1894         } else if (unlikely(client_cksum)) {
1895                 static int cksum_missed;
1896
1897                 cksum_missed++;
1898                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1899                         CERROR("%s: checksum %u requested from %s but not sent\n",
1900                                obd_name, cksum_missed,
1901                                libcfs_nid2str(peer->nid));
1902         } else {
1903                 rc = 0;
1904         }
1905 out:
1906         if (rc >= 0)
1907                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1908                                      aa->aa_oa, &body->oa);
1909
1910         RETURN(rc);
1911 }
1912
1913 static int osc_brw_redo_request(struct ptlrpc_request *request,
1914                                 struct osc_brw_async_args *aa, int rc)
1915 {
1916         struct ptlrpc_request *new_req;
1917         struct osc_brw_async_args *new_aa;
1918         struct osc_async_page *oap;
1919         ENTRY;
1920
1921         /* The below message is checked in replay-ost-single.sh test_8ae*/
1922         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1923                   "redo for recoverable error %d", rc);
1924
1925         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1926                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1927                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1928                                   aa->aa_ppga, &new_req, 1);
1929         if (rc)
1930                 RETURN(rc);
1931
1932         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1933                 if (oap->oap_request != NULL) {
1934                         LASSERTF(request == oap->oap_request,
1935                                  "request %p != oap_request %p\n",
1936                                  request, oap->oap_request);
1937                 }
1938         }
1939         /*
1940          * New request takes over pga and oaps from old request.
1941          * Note that copying a list_head doesn't work, need to move it...
1942          */
1943         aa->aa_resends++;
1944         new_req->rq_interpret_reply = request->rq_interpret_reply;
1945         new_req->rq_async_args = request->rq_async_args;
1946         new_req->rq_commit_cb = request->rq_commit_cb;
1947         /* cap resend delay to the current request timeout, this is similar to
1948          * what ptlrpc does (see after_reply()) */
1949         if (aa->aa_resends > new_req->rq_timeout)
1950                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1951         else
1952                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1953         new_req->rq_generation_set = 1;
1954         new_req->rq_import_generation = request->rq_import_generation;
1955
1956         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1957
1958         INIT_LIST_HEAD(&new_aa->aa_oaps);
1959         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1960         INIT_LIST_HEAD(&new_aa->aa_exts);
1961         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1962         new_aa->aa_resends = aa->aa_resends;
1963
1964         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1965                 if (oap->oap_request) {
1966                         ptlrpc_req_finished(oap->oap_request);
1967                         oap->oap_request = ptlrpc_request_addref(new_req);
1968                 }
1969         }
1970
1971         /* XXX: This code will run into problem if we're going to support
1972          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1973          * and wait for all of them to be finished. We should inherit request
1974          * set from old request. */
1975         ptlrpcd_add_req(new_req);
1976
1977         DEBUG_REQ(D_INFO, new_req, "new request");
1978         RETURN(0);
1979 }
1980
1981 /*
1982  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1983  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1984  * fine for our small page arrays and doesn't require allocation.  its an
1985  * insertion sort that swaps elements that are strides apart, shrinking the
1986  * stride down until its '1' and the array is sorted.
1987  */
1988 static void sort_brw_pages(struct brw_page **array, int num)
1989 {
1990         int stride, i, j;
1991         struct brw_page *tmp;
1992
1993         if (num == 1)
1994                 return;
1995         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1996                 ;
1997
1998         do {
1999                 stride /= 3;
2000                 for (i = stride ; i < num ; i++) {
2001                         tmp = array[i];
2002                         j = i;
2003                         while (j >= stride && array[j - stride]->off > tmp->off) {
2004                                 array[j] = array[j - stride];
2005                                 j -= stride;
2006                         }
2007                         array[j] = tmp;
2008                 }
2009         } while (stride > 1);
2010 }
2011
2012 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2013 {
2014         LASSERT(ppga != NULL);
2015         OBD_FREE_PTR_ARRAY(ppga, count);
2016 }
2017
2018 static int brw_interpret(const struct lu_env *env,
2019                          struct ptlrpc_request *req, void *args, int rc)
2020 {
2021         struct osc_brw_async_args *aa = args;
2022         struct osc_extent *ext;
2023         struct osc_extent *tmp;
2024         struct client_obd *cli = aa->aa_cli;
2025         unsigned long transferred = 0;
2026
2027         ENTRY;
2028
2029         rc = osc_brw_fini_request(req, rc);
2030         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2031         /*
2032          * When server returns -EINPROGRESS, client should always retry
2033          * regardless of the number of times the bulk was resent already.
2034          */
2035         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2036                 if (req->rq_import_generation !=
2037                     req->rq_import->imp_generation) {
2038                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2039                                ""DOSTID", rc = %d.\n",
2040                                req->rq_import->imp_obd->obd_name,
2041                                POSTID(&aa->aa_oa->o_oi), rc);
2042                 } else if (rc == -EINPROGRESS ||
2043                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2044                         rc = osc_brw_redo_request(req, aa, rc);
2045                 } else {
2046                         CERROR("%s: too many resent retries for object: "
2047                                "%llu:%llu, rc = %d.\n",
2048                                req->rq_import->imp_obd->obd_name,
2049                                POSTID(&aa->aa_oa->o_oi), rc);
2050                 }
2051
2052                 if (rc == 0)
2053                         RETURN(0);
2054                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2055                         rc = -EIO;
2056         }
2057
2058         if (rc == 0) {
2059                 struct obdo *oa = aa->aa_oa;
2060                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2061                 unsigned long valid = 0;
2062                 struct cl_object *obj;
2063                 struct osc_async_page *last;
2064
2065                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2066                 obj = osc2cl(last->oap_obj);
2067
2068                 cl_object_attr_lock(obj);
2069                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2070                         attr->cat_blocks = oa->o_blocks;
2071                         valid |= CAT_BLOCKS;
2072                 }
2073                 if (oa->o_valid & OBD_MD_FLMTIME) {
2074                         attr->cat_mtime = oa->o_mtime;
2075                         valid |= CAT_MTIME;
2076                 }
2077                 if (oa->o_valid & OBD_MD_FLATIME) {
2078                         attr->cat_atime = oa->o_atime;
2079                         valid |= CAT_ATIME;
2080                 }
2081                 if (oa->o_valid & OBD_MD_FLCTIME) {
2082                         attr->cat_ctime = oa->o_ctime;
2083                         valid |= CAT_CTIME;
2084                 }
2085
2086                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2087                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2088                         loff_t last_off = last->oap_count + last->oap_obj_off +
2089                                 last->oap_page_off;
2090
2091                         /* Change file size if this is an out of quota or
2092                          * direct IO write and it extends the file size */
2093                         if (loi->loi_lvb.lvb_size < last_off) {
2094                                 attr->cat_size = last_off;
2095                                 valid |= CAT_SIZE;
2096                         }
2097                         /* Extend KMS if it's not a lockless write */
2098                         if (loi->loi_kms < last_off &&
2099                             oap2osc_page(last)->ops_srvlock == 0) {
2100                                 attr->cat_kms = last_off;
2101                                 valid |= CAT_KMS;
2102                         }
2103                 }
2104
2105                 if (valid != 0)
2106                         cl_object_attr_update(env, obj, attr, valid);
2107                 cl_object_attr_unlock(obj);
2108         }
2109         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2110         aa->aa_oa = NULL;
2111
2112         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2113                 osc_inc_unstable_pages(req);
2114
2115         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2116                 list_del_init(&ext->oe_link);
2117                 osc_extent_finish(env, ext, 1,
2118                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2119         }
2120         LASSERT(list_empty(&aa->aa_exts));
2121         LASSERT(list_empty(&aa->aa_oaps));
2122
2123         transferred = (req->rq_bulk == NULL ? /* short io */
2124                        aa->aa_requested_nob :
2125                        req->rq_bulk->bd_nob_transferred);
2126
2127         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2128         ptlrpc_lprocfs_brw(req, transferred);
2129
2130         spin_lock(&cli->cl_loi_list_lock);
2131         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2132          * is called so we know whether to go to sync BRWs or wait for more
2133          * RPCs to complete */
2134         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2135                 cli->cl_w_in_flight--;
2136         else
2137                 cli->cl_r_in_flight--;
2138         osc_wake_cache_waiters(cli);
2139         spin_unlock(&cli->cl_loi_list_lock);
2140
2141         osc_io_unplug(env, cli, NULL);
2142         RETURN(rc);
2143 }
2144
2145 static void brw_commit(struct ptlrpc_request *req)
2146 {
2147         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2148          * this called via the rq_commit_cb, I need to ensure
2149          * osc_dec_unstable_pages is still called. Otherwise unstable
2150          * pages may be leaked. */
2151         spin_lock(&req->rq_lock);
2152         if (likely(req->rq_unstable)) {
2153                 req->rq_unstable = 0;
2154                 spin_unlock(&req->rq_lock);
2155
2156                 osc_dec_unstable_pages(req);
2157         } else {
2158                 req->rq_committed = 1;
2159                 spin_unlock(&req->rq_lock);
2160         }
2161 }
2162
2163 /**
2164  * Build an RPC by the list of extent @ext_list. The caller must ensure
2165  * that the total pages in this list are NOT over max pages per RPC.
2166  * Extents in the list must be in OES_RPC state.
2167  */
2168 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2169                   struct list_head *ext_list, int cmd)
2170 {
2171         struct ptlrpc_request           *req = NULL;
2172         struct osc_extent               *ext;
2173         struct brw_page                 **pga = NULL;
2174         struct osc_brw_async_args       *aa = NULL;
2175         struct obdo                     *oa = NULL;
2176         struct osc_async_page           *oap;
2177         struct osc_object               *obj = NULL;
2178         struct cl_req_attr              *crattr = NULL;
2179         loff_t                          starting_offset = OBD_OBJECT_EOF;
2180         loff_t                          ending_offset = 0;
2181         int                             mpflag = 0;
2182         int                             mem_tight = 0;
2183         int                             page_count = 0;
2184         bool                            soft_sync = false;
2185         bool                            ndelay = false;
2186         int                             i;
2187         int                             grant = 0;
2188         int                             rc;
2189         __u32                           layout_version = 0;
2190         LIST_HEAD(rpc_list);
2191         struct ost_body                 *body;
2192         ENTRY;
2193         LASSERT(!list_empty(ext_list));
2194
2195         /* add pages into rpc_list to build BRW rpc */
2196         list_for_each_entry(ext, ext_list, oe_link) {
2197                 LASSERT(ext->oe_state == OES_RPC);
2198                 mem_tight |= ext->oe_memalloc;
2199                 grant += ext->oe_grants;
2200                 page_count += ext->oe_nr_pages;
2201                 layout_version = max(layout_version, ext->oe_layout_version);
2202                 if (obj == NULL)
2203                         obj = ext->oe_obj;
2204         }
2205
2206         soft_sync = osc_over_unstable_soft_limit(cli);
2207         if (mem_tight)
2208                 mpflag = cfs_memory_pressure_get_and_set();
2209
2210         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2211         if (pga == NULL)
2212                 GOTO(out, rc = -ENOMEM);
2213
2214         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2215         if (oa == NULL)
2216                 GOTO(out, rc = -ENOMEM);
2217
2218         i = 0;
2219         list_for_each_entry(ext, ext_list, oe_link) {
2220                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2221                         if (mem_tight)
2222                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2223                         if (soft_sync)
2224                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2225                         pga[i] = &oap->oap_brw_page;
2226                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2227                         i++;
2228
2229                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2230                         if (starting_offset == OBD_OBJECT_EOF ||
2231                             starting_offset > oap->oap_obj_off)
2232                                 starting_offset = oap->oap_obj_off;
2233                         else
2234                                 LASSERT(oap->oap_page_off == 0);
2235                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2236                                 ending_offset = oap->oap_obj_off +
2237                                                 oap->oap_count;
2238                         else
2239                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2240                                         PAGE_SIZE);
2241                 }
2242                 if (ext->oe_ndelay)
2243                         ndelay = true;
2244         }
2245
2246         /* first page in the list */
2247         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2248
2249         crattr = &osc_env_info(env)->oti_req_attr;
2250         memset(crattr, 0, sizeof(*crattr));
2251         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2252         crattr->cra_flags = ~0ULL;
2253         crattr->cra_page = oap2cl_page(oap);
2254         crattr->cra_oa = oa;
2255         cl_req_attr_set(env, osc2cl(obj), crattr);
2256
2257         if (cmd == OBD_BRW_WRITE) {
2258                 oa->o_grant_used = grant;
2259                 if (layout_version > 0) {
2260                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2261                                PFID(&oa->o_oi.oi_fid), layout_version);
2262
2263                         oa->o_layout_version = layout_version;
2264                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2265                 }
2266         }
2267
2268         sort_brw_pages(pga, page_count);
2269         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2270         if (rc != 0) {
2271                 CERROR("prep_req failed: %d\n", rc);
2272                 GOTO(out, rc);
2273         }
2274
2275         req->rq_commit_cb = brw_commit;
2276         req->rq_interpret_reply = brw_interpret;
2277         req->rq_memalloc = mem_tight != 0;
2278         oap->oap_request = ptlrpc_request_addref(req);
2279         if (ndelay) {
2280                 req->rq_no_resend = req->rq_no_delay = 1;
2281                 /* probably set a shorter timeout value.
2282                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2283                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2284         }
2285
2286         /* Need to update the timestamps after the request is built in case
2287          * we race with setattr (locally or in queue at OST).  If OST gets
2288          * later setattr before earlier BRW (as determined by the request xid),
2289          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2290          * way to do this in a single call.  bug 10150 */
2291         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2292         crattr->cra_oa = &body->oa;
2293         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2294         cl_req_attr_set(env, osc2cl(obj), crattr);
2295         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2296
2297         aa = ptlrpc_req_async_args(aa, req);
2298         INIT_LIST_HEAD(&aa->aa_oaps);
2299         list_splice_init(&rpc_list, &aa->aa_oaps);
2300         INIT_LIST_HEAD(&aa->aa_exts);
2301         list_splice_init(ext_list, &aa->aa_exts);
2302
2303         spin_lock(&cli->cl_loi_list_lock);
2304         starting_offset >>= PAGE_SHIFT;
2305         if (cmd == OBD_BRW_READ) {
2306                 cli->cl_r_in_flight++;
2307                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2308                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2309                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2310                                       starting_offset + 1);
2311         } else {
2312                 cli->cl_w_in_flight++;
2313                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2314                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2315                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2316                                       starting_offset + 1);
2317         }
2318         spin_unlock(&cli->cl_loi_list_lock);
2319
2320         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2321                   page_count, aa, cli->cl_r_in_flight,
2322                   cli->cl_w_in_flight);
2323         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2324
2325         ptlrpcd_add_req(req);
2326         rc = 0;
2327         EXIT;
2328
2329 out:
2330         if (mem_tight != 0)
2331                 cfs_memory_pressure_restore(mpflag);
2332
2333         if (rc != 0) {
2334                 LASSERT(req == NULL);
2335
2336                 if (oa)
2337                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2338                 if (pga)
2339                         OBD_FREE_PTR_ARRAY(pga, page_count);
2340                 /* this should happen rarely and is pretty bad, it makes the
2341                  * pending list not follow the dirty order */
2342                 while (!list_empty(ext_list)) {
2343                         ext = list_entry(ext_list->next, struct osc_extent,
2344                                          oe_link);
2345                         list_del_init(&ext->oe_link);
2346                         osc_extent_finish(env, ext, 0, rc);
2347                 }
2348         }
2349         RETURN(rc);
2350 }
2351
2352 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2353 {
2354         int set = 0;
2355
2356         LASSERT(lock != NULL);
2357
2358         lock_res_and_lock(lock);
2359
2360         if (lock->l_ast_data == NULL)
2361                 lock->l_ast_data = data;
2362         if (lock->l_ast_data == data)
2363                 set = 1;
2364
2365         unlock_res_and_lock(lock);
2366
2367         return set;
2368 }
2369
2370 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2371                      void *cookie, struct lustre_handle *lockh,
2372                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2373                      int errcode)
2374 {
2375         bool intent = *flags & LDLM_FL_HAS_INTENT;
2376         int rc;
2377         ENTRY;
2378
2379         /* The request was created before ldlm_cli_enqueue call. */
2380         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2381                 struct ldlm_reply *rep;
2382
2383                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2384                 LASSERT(rep != NULL);
2385
2386                 rep->lock_policy_res1 =
2387                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2388                 if (rep->lock_policy_res1)
2389                         errcode = rep->lock_policy_res1;
2390                 if (!speculative)
2391                         *flags |= LDLM_FL_LVB_READY;
2392         } else if (errcode == ELDLM_OK) {
2393                 *flags |= LDLM_FL_LVB_READY;
2394         }
2395
2396         /* Call the update callback. */
2397         rc = (*upcall)(cookie, lockh, errcode);
2398
2399         /* release the reference taken in ldlm_cli_enqueue() */
2400         if (errcode == ELDLM_LOCK_MATCHED)
2401                 errcode = ELDLM_OK;
2402         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2403                 ldlm_lock_decref(lockh, mode);
2404
2405         RETURN(rc);
2406 }
2407
2408 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2409                           void *args, int rc)
2410 {
2411         struct osc_enqueue_args *aa = args;
2412         struct ldlm_lock *lock;
2413         struct lustre_handle *lockh = &aa->oa_lockh;
2414         enum ldlm_mode mode = aa->oa_mode;
2415         struct ost_lvb *lvb = aa->oa_lvb;
2416         __u32 lvb_len = sizeof(*lvb);
2417         __u64 flags = 0;
2418
2419         ENTRY;
2420
2421         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2422          * be valid. */
2423         lock = ldlm_handle2lock(lockh);
2424         LASSERTF(lock != NULL,
2425                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2426                  lockh->cookie, req, aa);
2427
2428         /* Take an additional reference so that a blocking AST that
2429          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2430          * to arrive after an upcall has been executed by
2431          * osc_enqueue_fini(). */
2432         ldlm_lock_addref(lockh, mode);
2433
2434         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2435         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2436
2437         /* Let CP AST to grant the lock first. */
2438         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2439
2440         if (aa->oa_speculative) {
2441                 LASSERT(aa->oa_lvb == NULL);
2442                 LASSERT(aa->oa_flags == NULL);
2443                 aa->oa_flags = &flags;
2444         }
2445
2446         /* Complete obtaining the lock procedure. */
2447         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2448                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2449                                    lockh, rc);
2450         /* Complete osc stuff. */
2451         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2452                               aa->oa_flags, aa->oa_speculative, rc);
2453
2454         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2455
2456         ldlm_lock_decref(lockh, mode);
2457         LDLM_LOCK_PUT(lock);
2458         RETURN(rc);
2459 }
2460
2461 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2462  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2463  * other synchronous requests, however keeping some locks and trying to obtain
2464  * others may take a considerable amount of time in a case of ost failure; and
2465  * when other sync requests do not get released lock from a client, the client
2466  * is evicted from the cluster -- such scenarious make the life difficult, so
2467  * release locks just after they are obtained. */
2468 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2469                      __u64 *flags, union ldlm_policy_data *policy,
2470                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2471                      void *cookie, struct ldlm_enqueue_info *einfo,
2472                      struct ptlrpc_request_set *rqset, int async,
2473                      bool speculative)
2474 {
2475         struct obd_device *obd = exp->exp_obd;
2476         struct lustre_handle lockh = { 0 };
2477         struct ptlrpc_request *req = NULL;
2478         int intent = *flags & LDLM_FL_HAS_INTENT;
2479         __u64 match_flags = *flags;
2480         enum ldlm_mode mode;
2481         int rc;
2482         ENTRY;
2483
2484         /* Filesystem lock extents are extended to page boundaries so that
2485          * dealing with the page cache is a little smoother.  */
2486         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2487         policy->l_extent.end |= ~PAGE_MASK;
2488
2489         /* Next, search for already existing extent locks that will cover us */
2490         /* If we're trying to read, we also search for an existing PW lock.  The
2491          * VFS and page cache already protect us locally, so lots of readers/
2492          * writers can share a single PW lock.
2493          *
2494          * There are problems with conversion deadlocks, so instead of
2495          * converting a read lock to a write lock, we'll just enqueue a new
2496          * one.
2497          *
2498          * At some point we should cancel the read lock instead of making them
2499          * send us a blocking callback, but there are problems with canceling
2500          * locks out from other users right now, too. */
2501         mode = einfo->ei_mode;
2502         if (einfo->ei_mode == LCK_PR)
2503                 mode |= LCK_PW;
2504         /* Normal lock requests must wait for the LVB to be ready before
2505          * matching a lock; speculative lock requests do not need to,
2506          * because they will not actually use the lock. */
2507         if (!speculative)
2508                 match_flags |= LDLM_FL_LVB_READY;
2509         if (intent != 0)
2510                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2511         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2512                                einfo->ei_type, policy, mode, &lockh, 0);
2513         if (mode) {
2514                 struct ldlm_lock *matched;
2515
2516                 if (*flags & LDLM_FL_TEST_LOCK)
2517                         RETURN(ELDLM_OK);
2518
2519                 matched = ldlm_handle2lock(&lockh);
2520                 if (speculative) {
2521                         /* This DLM lock request is speculative, and does not
2522                          * have an associated IO request. Therefore if there
2523                          * is already a DLM lock, it wll just inform the
2524                          * caller to cancel the request for this stripe.*/
2525                         lock_res_and_lock(matched);
2526                         if (ldlm_extent_equal(&policy->l_extent,
2527                             &matched->l_policy_data.l_extent))
2528                                 rc = -EEXIST;
2529                         else
2530                                 rc = -ECANCELED;
2531                         unlock_res_and_lock(matched);
2532
2533                         ldlm_lock_decref(&lockh, mode);
2534                         LDLM_LOCK_PUT(matched);
2535                         RETURN(rc);
2536                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2537                         *flags |= LDLM_FL_LVB_READY;
2538
2539                         /* We already have a lock, and it's referenced. */
2540                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2541
2542                         ldlm_lock_decref(&lockh, mode);
2543                         LDLM_LOCK_PUT(matched);
2544                         RETURN(ELDLM_OK);
2545                 } else {
2546                         ldlm_lock_decref(&lockh, mode);
2547                         LDLM_LOCK_PUT(matched);
2548                 }
2549         }
2550
2551         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2552                 RETURN(-ENOLCK);
2553
2554         if (intent) {
2555                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2556                                            &RQF_LDLM_ENQUEUE_LVB);
2557                 if (req == NULL)
2558                         RETURN(-ENOMEM);
2559
2560                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2561                 if (rc) {
2562                         ptlrpc_request_free(req);
2563                         RETURN(rc);
2564                 }
2565
2566                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2567                                      sizeof *lvb);
2568                 ptlrpc_request_set_replen(req);
2569         }
2570
2571         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2572         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2573
2574         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2575                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2576         if (async) {
2577                 if (!rc) {
2578                         struct osc_enqueue_args *aa;
2579                         aa = ptlrpc_req_async_args(aa, req);
2580                         aa->oa_exp         = exp;
2581                         aa->oa_mode        = einfo->ei_mode;
2582                         aa->oa_type        = einfo->ei_type;
2583                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2584                         aa->oa_upcall      = upcall;
2585                         aa->oa_cookie      = cookie;
2586                         aa->oa_speculative = speculative;
2587                         if (!speculative) {
2588                                 aa->oa_flags  = flags;
2589                                 aa->oa_lvb    = lvb;
2590                         } else {
2591                                 /* speculative locks are essentially to enqueue
2592                                  * a DLM lock  in advance, so we don't care
2593                                  * about the result of the enqueue. */
2594                                 aa->oa_lvb    = NULL;
2595                                 aa->oa_flags  = NULL;
2596                         }
2597
2598                         req->rq_interpret_reply = osc_enqueue_interpret;
2599                         ptlrpc_set_add_req(rqset, req);
2600                 } else if (intent) {
2601                         ptlrpc_req_finished(req);
2602                 }
2603                 RETURN(rc);
2604         }
2605
2606         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2607                               flags, speculative, rc);
2608         if (intent)
2609                 ptlrpc_req_finished(req);
2610
2611         RETURN(rc);
2612 }
2613
2614 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2615                    struct ldlm_res_id *res_id, enum ldlm_type type,
2616                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2617                    __u64 *flags, struct osc_object *obj,
2618                    struct lustre_handle *lockh, int unref)
2619 {
2620         struct obd_device *obd = exp->exp_obd;
2621         __u64 lflags = *flags;
2622         enum ldlm_mode rc;
2623         ENTRY;
2624
2625         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2626                 RETURN(-EIO);
2627
2628         /* Filesystem lock extents are extended to page boundaries so that
2629          * dealing with the page cache is a little smoother */
2630         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2631         policy->l_extent.end |= ~PAGE_MASK;
2632
2633         /* Next, search for already existing extent locks that will cover us */
2634         /* If we're trying to read, we also search for an existing PW lock.  The
2635          * VFS and page cache already protect us locally, so lots of readers/
2636          * writers can share a single PW lock. */
2637         rc = mode;
2638         if (mode == LCK_PR)
2639                 rc |= LCK_PW;
2640         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2641                              res_id, type, policy, rc, lockh, unref);
2642         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2643                 RETURN(rc);
2644
2645         if (obj != NULL) {
2646                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2647
2648                 LASSERT(lock != NULL);
2649                 if (osc_set_lock_data(lock, obj)) {
2650                         lock_res_and_lock(lock);
2651                         if (!ldlm_is_lvb_cached(lock)) {
2652                                 LASSERT(lock->l_ast_data == obj);
2653                                 osc_lock_lvb_update(env, obj, lock, NULL);
2654                                 ldlm_set_lvb_cached(lock);
2655                         }
2656                         unlock_res_and_lock(lock);
2657                 } else {
2658                         ldlm_lock_decref(lockh, rc);
2659                         rc = 0;
2660                 }
2661                 LDLM_LOCK_PUT(lock);
2662         }
2663         RETURN(rc);
2664 }
2665
2666 static int osc_statfs_interpret(const struct lu_env *env,
2667                                 struct ptlrpc_request *req, void *args, int rc)
2668 {
2669         struct osc_async_args *aa = args;
2670         struct obd_statfs *msfs;
2671
2672         ENTRY;
2673         if (rc == -EBADR)
2674                 /*
2675                  * The request has in fact never been sent due to issues at
2676                  * a higher level (LOV).  Exit immediately since the caller
2677                  * is aware of the problem and takes care of the clean up.
2678                  */
2679                 RETURN(rc);
2680
2681         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2682             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2683                 GOTO(out, rc = 0);
2684
2685         if (rc != 0)
2686                 GOTO(out, rc);
2687
2688         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2689         if (msfs == NULL)
2690                 GOTO(out, rc = -EPROTO);
2691
2692         *aa->aa_oi->oi_osfs = *msfs;
2693 out:
2694         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2695
2696         RETURN(rc);
2697 }
2698
2699 static int osc_statfs_async(struct obd_export *exp,
2700                             struct obd_info *oinfo, time64_t max_age,
2701                             struct ptlrpc_request_set *rqset)
2702 {
2703         struct obd_device     *obd = class_exp2obd(exp);
2704         struct ptlrpc_request *req;
2705         struct osc_async_args *aa;
2706         int rc;
2707         ENTRY;
2708
2709         if (obd->obd_osfs_age >= max_age) {
2710                 CDEBUG(D_SUPER,
2711                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2712                        obd->obd_name, &obd->obd_osfs,
2713                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2714                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2715                 spin_lock(&obd->obd_osfs_lock);
2716                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2717                 spin_unlock(&obd->obd_osfs_lock);
2718                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2719                 if (oinfo->oi_cb_up)
2720                         oinfo->oi_cb_up(oinfo, 0);
2721
2722                 RETURN(0);
2723         }
2724
2725         /* We could possibly pass max_age in the request (as an absolute
2726          * timestamp or a "seconds.usec ago") so the target can avoid doing
2727          * extra calls into the filesystem if that isn't necessary (e.g.
2728          * during mount that would help a bit).  Having relative timestamps
2729          * is not so great if request processing is slow, while absolute
2730          * timestamps are not ideal because they need time synchronization. */
2731         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2732         if (req == NULL)
2733                 RETURN(-ENOMEM);
2734
2735         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2736         if (rc) {
2737                 ptlrpc_request_free(req);
2738                 RETURN(rc);
2739         }
2740         ptlrpc_request_set_replen(req);
2741         req->rq_request_portal = OST_CREATE_PORTAL;
2742         ptlrpc_at_set_req_timeout(req);
2743
2744         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2745                 /* procfs requests not want stat in wait for avoid deadlock */
2746                 req->rq_no_resend = 1;
2747                 req->rq_no_delay = 1;
2748         }
2749
2750         req->rq_interpret_reply = osc_statfs_interpret;
2751         aa = ptlrpc_req_async_args(aa, req);
2752         aa->aa_oi = oinfo;
2753
2754         ptlrpc_set_add_req(rqset, req);
2755         RETURN(0);
2756 }
2757
2758 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2759                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2760 {
2761         struct obd_device     *obd = class_exp2obd(exp);
2762         struct obd_statfs     *msfs;
2763         struct ptlrpc_request *req;
2764         struct obd_import     *imp = NULL;
2765         int rc;
2766         ENTRY;
2767
2768
2769         /*Since the request might also come from lprocfs, so we need
2770          *sync this with client_disconnect_export Bug15684*/
2771         down_read(&obd->u.cli.cl_sem);
2772         if (obd->u.cli.cl_import)
2773                 imp = class_import_get(obd->u.cli.cl_import);
2774         up_read(&obd->u.cli.cl_sem);
2775         if (!imp)
2776                 RETURN(-ENODEV);
2777
2778         /* We could possibly pass max_age in the request (as an absolute
2779          * timestamp or a "seconds.usec ago") so the target can avoid doing
2780          * extra calls into the filesystem if that isn't necessary (e.g.
2781          * during mount that would help a bit).  Having relative timestamps
2782          * is not so great if request processing is slow, while absolute
2783          * timestamps are not ideal because they need time synchronization. */
2784         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2785
2786         class_import_put(imp);
2787
2788         if (req == NULL)
2789                 RETURN(-ENOMEM);
2790
2791         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2792         if (rc) {
2793                 ptlrpc_request_free(req);
2794                 RETURN(rc);
2795         }
2796         ptlrpc_request_set_replen(req);
2797         req->rq_request_portal = OST_CREATE_PORTAL;
2798         ptlrpc_at_set_req_timeout(req);
2799
2800         if (flags & OBD_STATFS_NODELAY) {
2801                 /* procfs requests not want stat in wait for avoid deadlock */
2802                 req->rq_no_resend = 1;
2803                 req->rq_no_delay = 1;
2804         }
2805
2806         rc = ptlrpc_queue_wait(req);
2807         if (rc)
2808                 GOTO(out, rc);
2809
2810         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2811         if (msfs == NULL)
2812                 GOTO(out, rc = -EPROTO);
2813
2814         *osfs = *msfs;
2815
2816         EXIT;
2817 out:
2818         ptlrpc_req_finished(req);
2819         return rc;
2820 }
2821
2822 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2823                          void *karg, void __user *uarg)
2824 {
2825         struct obd_device *obd = exp->exp_obd;
2826         struct obd_ioctl_data *data = karg;
2827         int rc = 0;
2828
2829         ENTRY;
2830         if (!try_module_get(THIS_MODULE)) {
2831                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2832                        module_name(THIS_MODULE));
2833                 return -EINVAL;
2834         }
2835         switch (cmd) {
2836         case OBD_IOC_CLIENT_RECOVER:
2837                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2838                                            data->ioc_inlbuf1, 0);
2839                 if (rc > 0)
2840                         rc = 0;
2841                 break;
2842         case IOC_OSC_SET_ACTIVE:
2843                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2844                                               data->ioc_offset);
2845                 break;
2846         default:
2847                 rc = -ENOTTY;
2848                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2849                        obd->obd_name, cmd, current->comm, rc);
2850                 break;
2851         }
2852
2853         module_put(THIS_MODULE);
2854         return rc;
2855 }
2856
2857 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2858                        u32 keylen, void *key, u32 vallen, void *val,
2859                        struct ptlrpc_request_set *set)
2860 {
2861         struct ptlrpc_request *req;
2862         struct obd_device     *obd = exp->exp_obd;
2863         struct obd_import     *imp = class_exp2cliimp(exp);
2864         char                  *tmp;
2865         int                    rc;
2866         ENTRY;
2867
2868         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2869
2870         if (KEY_IS(KEY_CHECKSUM)) {
2871                 if (vallen != sizeof(int))
2872                         RETURN(-EINVAL);
2873                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2874                 RETURN(0);
2875         }
2876
2877         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2878                 sptlrpc_conf_client_adapt(obd);
2879                 RETURN(0);
2880         }
2881
2882         if (KEY_IS(KEY_FLUSH_CTX)) {
2883                 sptlrpc_import_flush_my_ctx(imp);
2884                 RETURN(0);
2885         }
2886
2887         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2888                 struct client_obd *cli = &obd->u.cli;
2889                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2890                 long target = *(long *)val;
2891
2892                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2893                 *(long *)val -= nr;
2894                 RETURN(0);
2895         }
2896
2897         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2898                 RETURN(-EINVAL);
2899
2900         /* We pass all other commands directly to OST. Since nobody calls osc
2901            methods directly and everybody is supposed to go through LOV, we
2902            assume lov checked invalid values for us.
2903            The only recognised values so far are evict_by_nid and mds_conn.
2904            Even if something bad goes through, we'd get a -EINVAL from OST
2905            anyway. */
2906
2907         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2908                                                 &RQF_OST_SET_GRANT_INFO :
2909                                                 &RQF_OBD_SET_INFO);
2910         if (req == NULL)
2911                 RETURN(-ENOMEM);
2912
2913         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2914                              RCL_CLIENT, keylen);
2915         if (!KEY_IS(KEY_GRANT_SHRINK))
2916                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2917                                      RCL_CLIENT, vallen);
2918         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2919         if (rc) {
2920                 ptlrpc_request_free(req);
2921                 RETURN(rc);
2922         }
2923
2924         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2925         memcpy(tmp, key, keylen);
2926         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2927                                                         &RMF_OST_BODY :
2928                                                         &RMF_SETINFO_VAL);
2929         memcpy(tmp, val, vallen);
2930
2931         if (KEY_IS(KEY_GRANT_SHRINK)) {
2932                 struct osc_grant_args *aa;
2933                 struct obdo *oa;
2934
2935                 aa = ptlrpc_req_async_args(aa, req);
2936                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2937                 if (!oa) {
2938                         ptlrpc_req_finished(req);
2939                         RETURN(-ENOMEM);
2940                 }
2941                 *oa = ((struct ost_body *)val)->oa;
2942                 aa->aa_oa = oa;
2943                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2944         }
2945
2946         ptlrpc_request_set_replen(req);
2947         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2948                 LASSERT(set != NULL);
2949                 ptlrpc_set_add_req(set, req);
2950                 ptlrpc_check_set(NULL, set);
2951         } else {
2952                 ptlrpcd_add_req(req);
2953         }
2954
2955         RETURN(0);
2956 }
2957 EXPORT_SYMBOL(osc_set_info_async);
2958
2959 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2960                   struct obd_device *obd, struct obd_uuid *cluuid,
2961                   struct obd_connect_data *data, void *localdata)
2962 {
2963         struct client_obd *cli = &obd->u.cli;
2964
2965         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2966                 long lost_grant;
2967                 long grant;
2968
2969                 spin_lock(&cli->cl_loi_list_lock);
2970                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2971                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2972                         /* restore ocd_grant_blkbits as client page bits */
2973                         data->ocd_grant_blkbits = PAGE_SHIFT;
2974                         grant += cli->cl_dirty_grant;
2975                 } else {
2976                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2977                 }
2978                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2979                 lost_grant = cli->cl_lost_grant;
2980                 cli->cl_lost_grant = 0;
2981                 spin_unlock(&cli->cl_loi_list_lock);
2982
2983                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2984                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2985                        data->ocd_version, data->ocd_grant, lost_grant);
2986         }
2987
2988         RETURN(0);
2989 }
2990 EXPORT_SYMBOL(osc_reconnect);
2991
2992 int osc_disconnect(struct obd_export *exp)
2993 {
2994         struct obd_device *obd = class_exp2obd(exp);
2995         int rc;
2996
2997         rc = client_disconnect_export(exp);
2998         /**
2999          * Initially we put del_shrink_grant before disconnect_export, but it
3000          * causes the following problem if setup (connect) and cleanup
3001          * (disconnect) are tangled together.
3002          *      connect p1                     disconnect p2
3003          *   ptlrpc_connect_import
3004          *     ...............               class_manual_cleanup
3005          *                                     osc_disconnect
3006          *                                     del_shrink_grant
3007          *   ptlrpc_connect_interrupt
3008          *     osc_init_grant
3009          *   add this client to shrink list
3010          *                                      cleanup_osc
3011          * Bang! grant shrink thread trigger the shrink. BUG18662
3012          */
3013         osc_del_grant_list(&obd->u.cli);
3014         return rc;
3015 }
3016 EXPORT_SYMBOL(osc_disconnect);
3017
3018 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3019                                  struct hlist_node *hnode, void *arg)
3020 {
3021         struct lu_env *env = arg;
3022         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3023         struct ldlm_lock *lock;
3024         struct osc_object *osc = NULL;
3025         ENTRY;
3026
3027         lock_res(res);
3028         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3029                 if (lock->l_ast_data != NULL && osc == NULL) {
3030                         osc = lock->l_ast_data;
3031                         cl_object_get(osc2cl(osc));
3032                 }
3033
3034                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3035                  * by the 2nd round of ldlm_namespace_clean() call in
3036                  * osc_import_event(). */
3037                 ldlm_clear_cleaned(lock);
3038         }
3039         unlock_res(res);
3040
3041         if (osc != NULL) {
3042                 osc_object_invalidate(env, osc);
3043                 cl_object_put(env, osc2cl(osc));
3044         }
3045
3046         RETURN(0);
3047 }
3048 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3049
3050 static int osc_import_event(struct obd_device *obd,
3051                             struct obd_import *imp,
3052                             enum obd_import_event event)
3053 {
3054         struct client_obd *cli;
3055         int rc = 0;
3056
3057         ENTRY;
3058         LASSERT(imp->imp_obd == obd);
3059
3060         switch (event) {
3061         case IMP_EVENT_DISCON: {
3062                 cli = &obd->u.cli;
3063                 spin_lock(&cli->cl_loi_list_lock);
3064                 cli->cl_avail_grant = 0;
3065                 cli->cl_lost_grant = 0;
3066                 spin_unlock(&cli->cl_loi_list_lock);
3067                 break;
3068         }
3069         case IMP_EVENT_INACTIVE: {
3070                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3071                 break;
3072         }
3073         case IMP_EVENT_INVALIDATE: {
3074                 struct ldlm_namespace *ns = obd->obd_namespace;
3075                 struct lu_env         *env;
3076                 __u16                  refcheck;
3077
3078                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3079
3080                 env = cl_env_get(&refcheck);
3081                 if (!IS_ERR(env)) {
3082                         osc_io_unplug(env, &obd->u.cli, NULL);
3083
3084                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3085                                                  osc_ldlm_resource_invalidate,
3086                                                  env, 0);
3087                         cl_env_put(env, &refcheck);
3088
3089                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3090                 } else
3091                         rc = PTR_ERR(env);
3092                 break;
3093         }
3094         case IMP_EVENT_ACTIVE: {
3095                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3096                 break;
3097         }
3098         case IMP_EVENT_OCD: {
3099                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3100
3101                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3102                         osc_init_grant(&obd->u.cli, ocd);
3103
3104                 /* See bug 7198 */
3105                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3106                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3107
3108                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3109                 break;
3110         }
3111         case IMP_EVENT_DEACTIVATE: {
3112                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3113                 break;
3114         }
3115         case IMP_EVENT_ACTIVATE: {
3116                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3117                 break;
3118         }
3119         default:
3120                 CERROR("Unknown import event %d\n", event);
3121                 LBUG();
3122         }
3123         RETURN(rc);
3124 }
3125
3126 /**
3127  * Determine whether the lock can be canceled before replaying the lock
3128  * during recovery, see bug16774 for detailed information.
3129  *
3130  * \retval zero the lock can't be canceled
3131  * \retval other ok to cancel
3132  */
3133 static int osc_cancel_weight(struct ldlm_lock *lock)
3134 {
3135         /*
3136          * Cancel all unused and granted extent lock.
3137          */
3138         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3139             ldlm_is_granted(lock) &&
3140             osc_ldlm_weigh_ast(lock) == 0)
3141                 RETURN(1);
3142
3143         RETURN(0);
3144 }
3145
3146 static int brw_queue_work(const struct lu_env *env, void *data)
3147 {
3148         struct client_obd *cli = data;
3149
3150         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3151
3152         osc_io_unplug(env, cli, NULL);
3153         RETURN(0);
3154 }
3155
3156 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3157 {
3158         struct client_obd *cli = &obd->u.cli;
3159         void *handler;
3160         int rc;
3161
3162         ENTRY;
3163
3164         rc = ptlrpcd_addref();
3165         if (rc)
3166                 RETURN(rc);
3167
3168         rc = client_obd_setup(obd, lcfg);
3169         if (rc)
3170                 GOTO(out_ptlrpcd, rc);
3171
3172
3173         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3174         if (IS_ERR(handler))
3175                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3176         cli->cl_writeback_work = handler;
3177
3178         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3179         if (IS_ERR(handler))
3180                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3181         cli->cl_lru_work = handler;
3182
3183         rc = osc_quota_setup(obd);
3184         if (rc)
3185                 GOTO(out_ptlrpcd_work, rc);
3186
3187         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188         osc_update_next_shrink(cli);
3189
3190         RETURN(rc);
3191
3192 out_ptlrpcd_work:
3193         if (cli->cl_writeback_work != NULL) {
3194                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3195                 cli->cl_writeback_work = NULL;
3196         }
3197         if (cli->cl_lru_work != NULL) {
3198                 ptlrpcd_destroy_work(cli->cl_lru_work);
3199                 cli->cl_lru_work = NULL;
3200         }
3201         client_obd_cleanup(obd);
3202 out_ptlrpcd:
3203         ptlrpcd_decref();
3204         RETURN(rc);
3205 }
3206 EXPORT_SYMBOL(osc_setup_common);
3207
3208 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3209 {
3210         struct client_obd *cli = &obd->u.cli;
3211         int                adding;
3212         int                added;
3213         int                req_count;
3214         int                rc;
3215
3216         ENTRY;
3217
3218         rc = osc_setup_common(obd, lcfg);
3219         if (rc < 0)
3220                 RETURN(rc);
3221
3222         rc = osc_tunables_init(obd);
3223         if (rc)
3224                 RETURN(rc);
3225
3226         /*
3227          * We try to control the total number of requests with a upper limit
3228          * osc_reqpool_maxreqcount. There might be some race which will cause
3229          * over-limit allocation, but it is fine.
3230          */
3231         req_count = atomic_read(&osc_pool_req_count);
3232         if (req_count < osc_reqpool_maxreqcount) {
3233                 adding = cli->cl_max_rpcs_in_flight + 2;
3234                 if (req_count + adding > osc_reqpool_maxreqcount)
3235                         adding = osc_reqpool_maxreqcount - req_count;
3236
3237                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3238                 atomic_add(added, &osc_pool_req_count);
3239         }
3240
3241         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3242
3243         spin_lock(&osc_shrink_lock);
3244         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3245         spin_unlock(&osc_shrink_lock);
3246         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3247         cli->cl_import->imp_idle_debug = D_HA;
3248
3249         RETURN(0);
3250 }
3251
3252 int osc_precleanup_common(struct obd_device *obd)
3253 {
3254         struct client_obd *cli = &obd->u.cli;
3255         ENTRY;
3256
3257         /* LU-464
3258          * for echo client, export may be on zombie list, wait for
3259          * zombie thread to cull it, because cli.cl_import will be
3260          * cleared in client_disconnect_export():
3261          *   class_export_destroy() -> obd_cleanup() ->
3262          *   echo_device_free() -> echo_client_cleanup() ->
3263          *   obd_disconnect() -> osc_disconnect() ->
3264          *   client_disconnect_export()
3265          */
3266         obd_zombie_barrier();
3267         if (cli->cl_writeback_work) {
3268                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3269                 cli->cl_writeback_work = NULL;
3270         }
3271
3272         if (cli->cl_lru_work) {
3273                 ptlrpcd_destroy_work(cli->cl_lru_work);
3274                 cli->cl_lru_work = NULL;
3275         }
3276
3277         obd_cleanup_client_import(obd);
3278         RETURN(0);
3279 }
3280 EXPORT_SYMBOL(osc_precleanup_common);
3281
3282 static int osc_precleanup(struct obd_device *obd)
3283 {
3284         ENTRY;
3285
3286         osc_precleanup_common(obd);
3287
3288         ptlrpc_lprocfs_unregister_obd(obd);
3289         RETURN(0);
3290 }
3291
3292 int osc_cleanup_common(struct obd_device *obd)
3293 {
3294         struct client_obd *cli = &obd->u.cli;
3295         int rc;
3296
3297         ENTRY;
3298
3299         spin_lock(&osc_shrink_lock);
3300         list_del(&cli->cl_shrink_list);
3301         spin_unlock(&osc_shrink_lock);
3302
3303         /* lru cleanup */
3304         if (cli->cl_cache != NULL) {
3305                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3306                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3307                 list_del_init(&cli->cl_lru_osc);
3308                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3309                 cli->cl_lru_left = NULL;
3310                 cl_cache_decref(cli->cl_cache);
3311                 cli->cl_cache = NULL;
3312         }
3313
3314         /* free memory of osc quota cache */
3315         osc_quota_cleanup(obd);
3316
3317         rc = client_obd_cleanup(obd);
3318
3319         ptlrpcd_decref();
3320         RETURN(rc);
3321 }
3322 EXPORT_SYMBOL(osc_cleanup_common);
3323
3324 static const struct obd_ops osc_obd_ops = {
3325         .o_owner                = THIS_MODULE,
3326         .o_setup                = osc_setup,
3327         .o_precleanup           = osc_precleanup,
3328         .o_cleanup              = osc_cleanup_common,
3329         .o_add_conn             = client_import_add_conn,
3330         .o_del_conn             = client_import_del_conn,
3331         .o_connect              = client_connect_import,
3332         .o_reconnect            = osc_reconnect,
3333         .o_disconnect           = osc_disconnect,
3334         .o_statfs               = osc_statfs,
3335         .o_statfs_async         = osc_statfs_async,
3336         .o_create               = osc_create,
3337         .o_destroy              = osc_destroy,
3338         .o_getattr              = osc_getattr,
3339         .o_setattr              = osc_setattr,
3340         .o_iocontrol            = osc_iocontrol,
3341         .o_set_info_async       = osc_set_info_async,
3342         .o_import_event         = osc_import_event,
3343         .o_quotactl             = osc_quotactl,
3344 };
3345
3346 static struct shrinker *osc_cache_shrinker;
3347 LIST_HEAD(osc_shrink_list);
3348 DEFINE_SPINLOCK(osc_shrink_lock);
3349
3350 #ifndef HAVE_SHRINKER_COUNT
3351 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3352 {
3353         struct shrink_control scv = {
3354                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3355                 .gfp_mask   = shrink_param(sc, gfp_mask)
3356         };
3357         (void)osc_cache_shrink_scan(shrinker, &scv);
3358
3359         return osc_cache_shrink_count(shrinker, &scv);
3360 }
3361 #endif
3362
3363 static int __init osc_init(void)
3364 {
3365         unsigned int reqpool_size;
3366         unsigned int reqsize;
3367         int rc;
3368         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3369                          osc_cache_shrink_count, osc_cache_shrink_scan);
3370         ENTRY;
3371
3372         /* print an address of _any_ initialized kernel symbol from this
3373          * module, to allow debugging with gdb that doesn't support data
3374          * symbols from modules.*/
3375         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3376
3377         rc = lu_kmem_init(osc_caches);
3378         if (rc)
3379                 RETURN(rc);
3380
3381         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3382                                  LUSTRE_OSC_NAME, &osc_device_type);
3383         if (rc)
3384                 GOTO(out_kmem, rc);
3385
3386         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3387
3388         /* This is obviously too much memory, only prevent overflow here */
3389         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3390                 GOTO(out_type, rc = -EINVAL);
3391
3392         reqpool_size = osc_reqpool_mem_max << 20;
3393
3394         reqsize = 1;
3395         while (reqsize < OST_IO_MAXREQSIZE)
3396                 reqsize = reqsize << 1;
3397
3398         /*
3399          * We don't enlarge the request count in OSC pool according to
3400          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3401          * tried after normal allocation failed. So a small OSC pool won't
3402          * cause much performance degression in most of cases.
3403          */
3404         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3405
3406         atomic_set(&osc_pool_req_count, 0);
3407         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3408                                           ptlrpc_add_rqs_to_pool);
3409
3410         if (osc_rq_pool == NULL)
3411                 GOTO(out_type, rc = -ENOMEM);
3412
3413         rc = osc_start_grant_work();
3414         if (rc != 0)
3415                 GOTO(out_req_pool, rc);
3416
3417         RETURN(rc);
3418
3419 out_req_pool:
3420         ptlrpc_free_rq_pool(osc_rq_pool);
3421 out_type:
3422         class_unregister_type(LUSTRE_OSC_NAME);
3423 out_kmem:
3424         lu_kmem_fini(osc_caches);
3425
3426         RETURN(rc);
3427 }
3428
3429 static void __exit osc_exit(void)
3430 {
3431         osc_stop_grant_work();
3432         remove_shrinker(osc_cache_shrinker);
3433         class_unregister_type(LUSTRE_OSC_NAME);
3434         lu_kmem_fini(osc_caches);
3435         ptlrpc_free_rq_pool(osc_rq_pool);
3436 }
3437
3438 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3439 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3440 MODULE_VERSION(LUSTRE_VERSION_STRING);
3441 MODULE_LICENSE("GPL");
3442
3443 module_init(osc_init);
3444 module_exit(osc_exit);