Whamcloud - gitweb
b4ed99b821f1c255366c0624ac51b8f0c142c1d5
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct list_head       cancels = LIST_HEAD_INIT(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
649                      cli->cl_dirty_max_pages)) {
650                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
651                        cli->cl_dirty_pages, cli->cl_dirty_transit,
652                        cli->cl_dirty_max_pages);
653                 oa->o_undirty = 0;
654         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
655                             atomic_long_read(&obd_dirty_transit_pages) >
656                             (long)(obd_max_dirty_pages + 1))) {
657                 /* The atomic_read() allowing the atomic_inc() are
658                  * not covered by a lock thus they may safely race and trip
659                  * this CERROR() unless we add in a small fudge factor (+1). */
660                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
661                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
662                        atomic_long_read(&obd_dirty_transit_pages),
663                        obd_max_dirty_pages);
664                 oa->o_undirty = 0;
665         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
666                             0x7fffffff)) {
667                 CERROR("dirty %lu - dirty_max %lu too big???\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else {
671                 unsigned long nrpages;
672                 unsigned long undirty;
673
674                 nrpages = cli->cl_max_pages_per_rpc;
675                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
676                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
677                 undirty = nrpages << PAGE_SHIFT;
678                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
679                                  GRANT_PARAM)) {
680                         int nrextents;
681
682                         /* take extent tax into account when asking for more
683                          * grant space */
684                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
685                                      cli->cl_max_extent_pages;
686                         undirty += nrextents * cli->cl_grant_extent_tax;
687                 }
688                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
689                  * to add extent tax, etc.
690                  */
691                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
692                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
693         }
694         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
695         oa->o_dropped = cli->cl_lost_grant;
696         cli->cl_lost_grant = 0;
697         spin_unlock(&cli->cl_loi_list_lock);
698         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
699                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
700 }
701
702 void osc_update_next_shrink(struct client_obd *cli)
703 {
704         cli->cl_next_shrink_grant = ktime_get_seconds() +
705                                     cli->cl_grant_shrink_interval;
706
707         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
708                cli->cl_next_shrink_grant);
709 }
710
711 static void __osc_update_grant(struct client_obd *cli, u64 grant)
712 {
713         spin_lock(&cli->cl_loi_list_lock);
714         cli->cl_avail_grant += grant;
715         spin_unlock(&cli->cl_loi_list_lock);
716 }
717
718 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
719 {
720         if (body->oa.o_valid & OBD_MD_FLGRANT) {
721                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
722                 __osc_update_grant(cli, body->oa.o_grant);
723         }
724 }
725
726 /**
727  * grant thread data for shrinking space.
728  */
729 struct grant_thread_data {
730         struct list_head        gtd_clients;
731         struct mutex            gtd_mutex;
732         unsigned long           gtd_stopped:1;
733 };
734 static struct grant_thread_data client_gtd;
735
736 static int osc_shrink_grant_interpret(const struct lu_env *env,
737                                       struct ptlrpc_request *req,
738                                       void *args, int rc)
739 {
740         struct osc_grant_args *aa = args;
741         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
742         struct ost_body *body;
743
744         if (rc != 0) {
745                 __osc_update_grant(cli, aa->aa_oa->o_grant);
746                 GOTO(out, rc);
747         }
748
749         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750         LASSERT(body);
751         osc_update_grant(cli, body);
752 out:
753         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
754
755         return rc;
756 }
757
758 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
759 {
760         spin_lock(&cli->cl_loi_list_lock);
761         oa->o_grant = cli->cl_avail_grant / 4;
762         cli->cl_avail_grant -= oa->o_grant;
763         spin_unlock(&cli->cl_loi_list_lock);
764         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
765                 oa->o_valid |= OBD_MD_FLFLAGS;
766                 oa->o_flags = 0;
767         }
768         oa->o_flags |= OBD_FL_SHRINK_GRANT;
769         osc_update_next_shrink(cli);
770 }
771
772 /* Shrink the current grant, either from some large amount to enough for a
773  * full set of in-flight RPCs, or if we have already shrunk to that limit
774  * then to enough for a single RPC.  This avoids keeping more grant than
775  * needed, and avoids shrinking the grant piecemeal. */
776 static int osc_shrink_grant(struct client_obd *cli)
777 {
778         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
779                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
780
781         spin_lock(&cli->cl_loi_list_lock);
782         if (cli->cl_avail_grant <= target_bytes)
783                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
784         spin_unlock(&cli->cl_loi_list_lock);
785
786         return osc_shrink_grant_to_target(cli, target_bytes);
787 }
788
789 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
790 {
791         int                     rc = 0;
792         struct ost_body        *body;
793         ENTRY;
794
795         spin_lock(&cli->cl_loi_list_lock);
796         /* Don't shrink if we are already above or below the desired limit
797          * We don't want to shrink below a single RPC, as that will negatively
798          * impact block allocation and long-term performance. */
799         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
800                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
801
802         if (target_bytes >= cli->cl_avail_grant) {
803                 spin_unlock(&cli->cl_loi_list_lock);
804                 RETURN(0);
805         }
806         spin_unlock(&cli->cl_loi_list_lock);
807
808         OBD_ALLOC_PTR(body);
809         if (!body)
810                 RETURN(-ENOMEM);
811
812         osc_announce_cached(cli, &body->oa, 0);
813
814         spin_lock(&cli->cl_loi_list_lock);
815         if (target_bytes >= cli->cl_avail_grant) {
816                 /* available grant has changed since target calculation */
817                 spin_unlock(&cli->cl_loi_list_lock);
818                 GOTO(out_free, rc = 0);
819         }
820         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
821         cli->cl_avail_grant = target_bytes;
822         spin_unlock(&cli->cl_loi_list_lock);
823         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
824                 body->oa.o_valid |= OBD_MD_FLFLAGS;
825                 body->oa.o_flags = 0;
826         }
827         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
828         osc_update_next_shrink(cli);
829
830         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
831                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
832                                 sizeof(*body), body, NULL);
833         if (rc != 0)
834                 __osc_update_grant(cli, body->oa.o_grant);
835 out_free:
836         OBD_FREE_PTR(body);
837         RETURN(rc);
838 }
839
840 static int osc_should_shrink_grant(struct client_obd *client)
841 {
842         time64_t next_shrink = client->cl_next_shrink_grant;
843
844         if (client->cl_import == NULL)
845                 return 0;
846
847         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
848              OBD_CONNECT_GRANT_SHRINK) == 0)
849                 return 0;
850
851         if (ktime_get_seconds() >= next_shrink - 5) {
852                 /* Get the current RPC size directly, instead of going via:
853                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
854                  * Keep comment here so that it can be found by searching. */
855                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
856
857                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
858                     client->cl_avail_grant > brw_size)
859                         return 1;
860                 else
861                         osc_update_next_shrink(client);
862         }
863         return 0;
864 }
865
866 #define GRANT_SHRINK_RPC_BATCH  100
867
868 static struct delayed_work work;
869
870 static void osc_grant_work_handler(struct work_struct *data)
871 {
872         struct client_obd *cli;
873         int rpc_sent;
874         bool init_next_shrink = true;
875         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
876
877         rpc_sent = 0;
878         mutex_lock(&client_gtd.gtd_mutex);
879         list_for_each_entry(cli, &client_gtd.gtd_clients,
880                             cl_grant_chain) {
881                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
882                     osc_should_shrink_grant(cli)) {
883                         osc_shrink_grant(cli);
884                         rpc_sent++;
885                 }
886
887                 if (!init_next_shrink) {
888                         if (cli->cl_next_shrink_grant < next_shrink &&
889                             cli->cl_next_shrink_grant > ktime_get_seconds())
890                                 next_shrink = cli->cl_next_shrink_grant;
891                 } else {
892                         init_next_shrink = false;
893                         next_shrink = cli->cl_next_shrink_grant;
894                 }
895         }
896         mutex_unlock(&client_gtd.gtd_mutex);
897
898         if (client_gtd.gtd_stopped == 1)
899                 return;
900
901         if (next_shrink > ktime_get_seconds()) {
902                 time64_t delay = next_shrink - ktime_get_seconds();
903
904                 schedule_delayed_work(&work, cfs_time_seconds(delay));
905         } else {
906                 schedule_work(&work.work);
907         }
908 }
909
910 void osc_schedule_grant_work(void)
911 {
912         cancel_delayed_work_sync(&work);
913         schedule_work(&work.work);
914 }
915
916 /**
917  * Start grant thread for returing grant to server for idle clients.
918  */
919 static int osc_start_grant_work(void)
920 {
921         client_gtd.gtd_stopped = 0;
922         mutex_init(&client_gtd.gtd_mutex);
923         INIT_LIST_HEAD(&client_gtd.gtd_clients);
924
925         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
926         schedule_work(&work.work);
927
928         return 0;
929 }
930
931 static void osc_stop_grant_work(void)
932 {
933         client_gtd.gtd_stopped = 1;
934         cancel_delayed_work_sync(&work);
935 }
936
937 static void osc_add_grant_list(struct client_obd *client)
938 {
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 static void osc_del_grant_list(struct client_obd *client)
945 {
946         if (list_empty(&client->cl_grant_chain))
947                 return;
948
949         mutex_lock(&client_gtd.gtd_mutex);
950         list_del_init(&client->cl_grant_chain);
951         mutex_unlock(&client_gtd.gtd_mutex);
952 }
953
954 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
955 {
956         /*
957          * ocd_grant is the total grant amount we're expect to hold: if we've
958          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
959          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
960          * dirty.
961          *
962          * race is tolerable here: if we're evicted, but imp_state already
963          * left EVICTED state, then cl_dirty_pages must be 0 already.
964          */
965         spin_lock(&cli->cl_loi_list_lock);
966         cli->cl_avail_grant = ocd->ocd_grant;
967         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
968                 cli->cl_avail_grant -= cli->cl_reserved_grant;
969                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
970                         cli->cl_avail_grant -= cli->cl_dirty_grant;
971                 else
972                         cli->cl_avail_grant -=
973                                         cli->cl_dirty_pages << PAGE_SHIFT;
974         }
975
976         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
977                 u64 size;
978                 int chunk_mask;
979
980                 /* overhead for each extent insertion */
981                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
982                 /* determine the appropriate chunk size used by osc_extent. */
983                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
984                                           ocd->ocd_grant_blkbits);
985                 /* max_pages_per_rpc must be chunk aligned */
986                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
987                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
988                                              ~chunk_mask) & chunk_mask;
989                 /* determine maximum extent size, in #pages */
990                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
991                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
992                 if (cli->cl_max_extent_pages == 0)
993                         cli->cl_max_extent_pages = 1;
994         } else {
995                 cli->cl_grant_extent_tax = 0;
996                 cli->cl_chunkbits = PAGE_SHIFT;
997                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
998         }
999         spin_unlock(&cli->cl_loi_list_lock);
1000
1001         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1002                 "chunk bits: %d cl_max_extent_pages: %d\n",
1003                 cli_name(cli),
1004                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1005                 cli->cl_max_extent_pages);
1006
1007         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1008                 osc_add_grant_list(cli);
1009 }
1010 EXPORT_SYMBOL(osc_init_grant);
1011
1012 /* We assume that the reason this OSC got a short read is because it read
1013  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1014  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1015  * this stripe never got written at or beyond this stripe offset yet. */
1016 static void handle_short_read(int nob_read, size_t page_count,
1017                               struct brw_page **pga)
1018 {
1019         char *ptr;
1020         int i = 0;
1021
1022         /* skip bytes read OK */
1023         while (nob_read > 0) {
1024                 LASSERT (page_count > 0);
1025
1026                 if (pga[i]->count > nob_read) {
1027                         /* EOF inside this page */
1028                         ptr = kmap(pga[i]->pg) +
1029                                 (pga[i]->off & ~PAGE_MASK);
1030                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1031                         kunmap(pga[i]->pg);
1032                         page_count--;
1033                         i++;
1034                         break;
1035                 }
1036
1037                 nob_read -= pga[i]->count;
1038                 page_count--;
1039                 i++;
1040         }
1041
1042         /* zero remaining pages */
1043         while (page_count-- > 0) {
1044                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1045                 memset(ptr, 0, pga[i]->count);
1046                 kunmap(pga[i]->pg);
1047                 i++;
1048         }
1049 }
1050
1051 static int check_write_rcs(struct ptlrpc_request *req,
1052                            int requested_nob, int niocount,
1053                            size_t page_count, struct brw_page **pga)
1054 {
1055         int     i;
1056         __u32   *remote_rcs;
1057
1058         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1059                                                   sizeof(*remote_rcs) *
1060                                                   niocount);
1061         if (remote_rcs == NULL) {
1062                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1063                 return(-EPROTO);
1064         }
1065
1066         /* return error if any niobuf was in error */
1067         for (i = 0; i < niocount; i++) {
1068                 if ((int)remote_rcs[i] < 0) {
1069                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1070                                i, remote_rcs[i], req);
1071                         return remote_rcs[i];
1072                 }
1073
1074                 if (remote_rcs[i] != 0) {
1075                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1076                                 i, remote_rcs[i], req);
1077                         return(-EPROTO);
1078                 }
1079         }
1080         if (req->rq_bulk != NULL &&
1081             req->rq_bulk->bd_nob_transferred != requested_nob) {
1082                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1083                        req->rq_bulk->bd_nob_transferred, requested_nob);
1084                 return(-EPROTO);
1085         }
1086
1087         return (0);
1088 }
1089
1090 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1091 {
1092         if (p1->flag != p2->flag) {
1093                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1094                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1095                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1096
1097                 /* warn if we try to combine flags that we don't know to be
1098                  * safe to combine */
1099                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1100                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1101                               "report this at https://jira.whamcloud.com/\n",
1102                               p1->flag, p2->flag);
1103                 }
1104                 return 0;
1105         }
1106
1107         return (p1->off + p1->count == p2->off);
1108 }
1109
1110 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1111 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1112                                    size_t pg_count, struct brw_page **pga,
1113                                    int opc, obd_dif_csum_fn *fn,
1114                                    int sector_size,
1115                                    u32 *check_sum)
1116 {
1117         struct ahash_request *req;
1118         /* Used Adler as the default checksum type on top of DIF tags */
1119         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1120         struct page *__page;
1121         unsigned char *buffer;
1122         __u16 *guard_start;
1123         unsigned int bufsize;
1124         int guard_number;
1125         int used_number = 0;
1126         int used;
1127         u32 cksum;
1128         int rc = 0;
1129         int i = 0;
1130
1131         LASSERT(pg_count > 0);
1132
1133         __page = alloc_page(GFP_KERNEL);
1134         if (__page == NULL)
1135                 return -ENOMEM;
1136
1137         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1138         if (IS_ERR(req)) {
1139                 rc = PTR_ERR(req);
1140                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1141                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1142                 GOTO(out, rc);
1143         }
1144
1145         buffer = kmap(__page);
1146         guard_start = (__u16 *)buffer;
1147         guard_number = PAGE_SIZE / sizeof(*guard_start);
1148         while (nob > 0 && pg_count > 0) {
1149                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1150
1151                 /* corrupt the data before we compute the checksum, to
1152                  * simulate an OST->client data error */
1153                 if (unlikely(i == 0 && opc == OST_READ &&
1154                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1155                         unsigned char *ptr = kmap(pga[i]->pg);
1156                         int off = pga[i]->off & ~PAGE_MASK;
1157
1158                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1159                         kunmap(pga[i]->pg);
1160                 }
1161
1162                 /*
1163                  * The left guard number should be able to hold checksums of a
1164                  * whole page
1165                  */
1166                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1167                                                   pga[i]->off & ~PAGE_MASK,
1168                                                   count,
1169                                                   guard_start + used_number,
1170                                                   guard_number - used_number,
1171                                                   &used, sector_size,
1172                                                   fn);
1173                 if (rc)
1174                         break;
1175
1176                 used_number += used;
1177                 if (used_number == guard_number) {
1178                         cfs_crypto_hash_update_page(req, __page, 0,
1179                                 used_number * sizeof(*guard_start));
1180                         used_number = 0;
1181                 }
1182
1183                 nob -= pga[i]->count;
1184                 pg_count--;
1185                 i++;
1186         }
1187         kunmap(__page);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         if (used_number != 0)
1192                 cfs_crypto_hash_update_page(req, __page, 0,
1193                         used_number * sizeof(*guard_start));
1194
1195         bufsize = sizeof(cksum);
1196         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1197
1198         /* For sending we only compute the wrong checksum instead
1199          * of corrupting the data so it is still correct on a redo */
1200         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1201                 cksum++;
1202
1203         *check_sum = cksum;
1204 out:
1205         __free_page(__page);
1206         return rc;
1207 }
1208 #else /* !CONFIG_CRC_T10DIF */
1209 #define obd_dif_ip_fn NULL
1210 #define obd_dif_crc_fn NULL
1211 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1212         -EOPNOTSUPP
1213 #endif /* CONFIG_CRC_T10DIF */
1214
1215 static int osc_checksum_bulk(int nob, size_t pg_count,
1216                              struct brw_page **pga, int opc,
1217                              enum cksum_types cksum_type,
1218                              u32 *cksum)
1219 {
1220         int                             i = 0;
1221         struct ahash_request           *req;
1222         unsigned int                    bufsize;
1223         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1224
1225         LASSERT(pg_count > 0);
1226
1227         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1228         if (IS_ERR(req)) {
1229                 CERROR("Unable to initialize checksum hash %s\n",
1230                        cfs_crypto_hash_name(cfs_alg));
1231                 return PTR_ERR(req);
1232         }
1233
1234         while (nob > 0 && pg_count > 0) {
1235                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1236
1237                 /* corrupt the data before we compute the checksum, to
1238                  * simulate an OST->client data error */
1239                 if (i == 0 && opc == OST_READ &&
1240                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1241                         unsigned char *ptr = kmap(pga[i]->pg);
1242                         int off = pga[i]->off & ~PAGE_MASK;
1243
1244                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1245                         kunmap(pga[i]->pg);
1246                 }
1247                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1248                                             pga[i]->off & ~PAGE_MASK,
1249                                             count);
1250                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1251                                (int)(pga[i]->off & ~PAGE_MASK));
1252
1253                 nob -= pga[i]->count;
1254                 pg_count--;
1255                 i++;
1256         }
1257
1258         bufsize = sizeof(*cksum);
1259         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1260
1261         /* For sending we only compute the wrong checksum instead
1262          * of corrupting the data so it is still correct on a redo */
1263         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264                 (*cksum)++;
1265
1266         return 0;
1267 }
1268
1269 static int osc_checksum_bulk_rw(const char *obd_name,
1270                                 enum cksum_types cksum_type,
1271                                 int nob, size_t pg_count,
1272                                 struct brw_page **pga, int opc,
1273                                 u32 *check_sum)
1274 {
1275         obd_dif_csum_fn *fn = NULL;
1276         int sector_size = 0;
1277         int rc;
1278
1279         ENTRY;
1280         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1281
1282         if (fn)
1283                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1284                                              opc, fn, sector_size, check_sum);
1285         else
1286                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1287                                        check_sum);
1288
1289         RETURN(rc);
1290 }
1291
1292 static int
1293 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1294                      u32 page_count, struct brw_page **pga,
1295                      struct ptlrpc_request **reqp, int resend)
1296 {
1297         struct ptlrpc_request   *req;
1298         struct ptlrpc_bulk_desc *desc;
1299         struct ost_body         *body;
1300         struct obd_ioobj        *ioobj;
1301         struct niobuf_remote    *niobuf;
1302         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1303         struct osc_brw_async_args *aa;
1304         struct req_capsule      *pill;
1305         struct brw_page *pg_prev;
1306         void *short_io_buf;
1307         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1308
1309         ENTRY;
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1311                 RETURN(-ENOMEM); /* Recoverable */
1312         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1313                 RETURN(-EINVAL); /* Fatal */
1314
1315         if ((cmd & OBD_BRW_WRITE) != 0) {
1316                 opc = OST_WRITE;
1317                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1318                                                 osc_rq_pool,
1319                                                 &RQF_OST_BRW_WRITE);
1320         } else {
1321                 opc = OST_READ;
1322                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1323         }
1324         if (req == NULL)
1325                 RETURN(-ENOMEM);
1326
1327         for (niocount = i = 1; i < page_count; i++) {
1328                 if (!can_merge_pages(pga[i - 1], pga[i]))
1329                         niocount++;
1330         }
1331
1332         pill = &req->rq_pill;
1333         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1334                              sizeof(*ioobj));
1335         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1336                              niocount * sizeof(*niobuf));
1337
1338         for (i = 0; i < page_count; i++)
1339                 short_io_size += pga[i]->count;
1340
1341         /* Check if read/write is small enough to be a short io. */
1342         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1343             !imp_connect_shortio(cli->cl_import))
1344                 short_io_size = 0;
1345
1346         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1347                              opc == OST_READ ? 0 : short_io_size);
1348         if (opc == OST_READ)
1349                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1350                                      short_io_size);
1351
1352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1353         if (rc) {
1354                 ptlrpc_request_free(req);
1355                 RETURN(rc);
1356         }
1357         osc_set_io_portal(req);
1358
1359         ptlrpc_at_set_req_timeout(req);
1360         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1361          * retry logic */
1362         req->rq_no_retry_einprogress = 1;
1363
1364         if (short_io_size != 0) {
1365                 desc = NULL;
1366                 short_io_buf = NULL;
1367                 goto no_bulk;
1368         }
1369
1370         desc = ptlrpc_prep_bulk_imp(req, page_count,
1371                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1372                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1373                         PTLRPC_BULK_PUT_SINK) |
1374                         PTLRPC_BULK_BUF_KIOV,
1375                 OST_BULK_PORTAL,
1376                 &ptlrpc_bulk_kiov_pin_ops);
1377
1378         if (desc == NULL)
1379                 GOTO(out, rc = -ENOMEM);
1380         /* NB request now owns desc and will free it when it gets freed */
1381 no_bulk:
1382         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1383         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1384         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1385         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1386
1387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1388
1389         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1390          * and from_kgid(), because they are asynchronous. Fortunately, variable
1391          * oa contains valid o_uid and o_gid in these two operations.
1392          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1393          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1394          * other process logic */
1395         body->oa.o_uid = oa->o_uid;
1396         body->oa.o_gid = oa->o_gid;
1397
1398         obdo_to_ioobj(oa, ioobj);
1399         ioobj->ioo_bufcnt = niocount;
1400         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1401          * that might be send for this request.  The actual number is decided
1402          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1403          * "max - 1" for old client compatibility sending "0", and also so the
1404          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1405         if (desc != NULL)
1406                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1407         else /* short io */
1408                 ioobj_max_brw_set(ioobj, 0);
1409
1410         if (short_io_size != 0) {
1411                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1412                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1413                         body->oa.o_flags = 0;
1414                 }
1415                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1416                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1417                        short_io_size);
1418                 if (opc == OST_WRITE) {
1419                         short_io_buf = req_capsule_client_get(pill,
1420                                                               &RMF_SHORT_IO);
1421                         LASSERT(short_io_buf != NULL);
1422                 }
1423         }
1424
1425         LASSERT(page_count > 0);
1426         pg_prev = pga[0];
1427         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1428                 struct brw_page *pg = pga[i];
1429                 int poff = pg->off & ~PAGE_MASK;
1430
1431                 LASSERT(pg->count > 0);
1432                 /* make sure there is no gap in the middle of page array */
1433                 LASSERTF(page_count == 1 ||
1434                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1435                           ergo(i > 0 && i < page_count - 1,
1436                                poff == 0 && pg->count == PAGE_SIZE)   &&
1437                           ergo(i == page_count - 1, poff == 0)),
1438                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1439                          i, page_count, pg, pg->off, pg->count);
1440                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1441                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1442                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1443                          i, page_count,
1444                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1445                          pg_prev->pg, page_private(pg_prev->pg),
1446                          pg_prev->pg->index, pg_prev->off);
1447                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1448                         (pg->flag & OBD_BRW_SRVLOCK));
1449                 if (short_io_size != 0 && opc == OST_WRITE) {
1450                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1451
1452                         LASSERT(short_io_size >= requested_nob + pg->count);
1453                         memcpy(short_io_buf + requested_nob,
1454                                ptr + poff,
1455                                pg->count);
1456                         ll_kunmap_atomic(ptr, KM_USER0);
1457                 } else if (short_io_size == 0) {
1458                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1459                                                          pg->count);
1460                 }
1461                 requested_nob += pg->count;
1462
1463                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1464                         niobuf--;
1465                         niobuf->rnb_len += pg->count;
1466                 } else {
1467                         niobuf->rnb_offset = pg->off;
1468                         niobuf->rnb_len    = pg->count;
1469                         niobuf->rnb_flags  = pg->flag;
1470                 }
1471                 pg_prev = pg;
1472         }
1473
1474         LASSERTF((void *)(niobuf - niocount) ==
1475                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1476                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1477                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1478
1479         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1480         if (resend) {
1481                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1482                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1483                         body->oa.o_flags = 0;
1484                 }
1485                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1486         }
1487
1488         if (osc_should_shrink_grant(cli))
1489                 osc_shrink_grant_local(cli, &body->oa);
1490
1491         /* size[REQ_REC_OFF] still sizeof (*body) */
1492         if (opc == OST_WRITE) {
1493                 if (cli->cl_checksum &&
1494                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1495                         /* store cl_cksum_type in a local variable since
1496                          * it can be changed via lprocfs */
1497                         enum cksum_types cksum_type = cli->cl_cksum_type;
1498
1499                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1500                                 body->oa.o_flags = 0;
1501
1502                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1503                                                                 cksum_type);
1504                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1505
1506                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1507                                                   requested_nob, page_count,
1508                                                   pga, OST_WRITE,
1509                                                   &body->oa.o_cksum);
1510                         if (rc < 0) {
1511                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1512                                        rc);
1513                                 GOTO(out, rc);
1514                         }
1515                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1516                                body->oa.o_cksum);
1517
1518                         /* save this in 'oa', too, for later checking */
1519                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1521                                                            cksum_type);
1522                 } else {
1523                         /* clear out the checksum flag, in case this is a
1524                          * resend but cl_checksum is no longer set. b=11238 */
1525                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1526                 }
1527                 oa->o_cksum = body->oa.o_cksum;
1528                 /* 1 RC per niobuf */
1529                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1530                                      sizeof(__u32) * niocount);
1531         } else {
1532                 if (cli->cl_checksum &&
1533                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1534                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1535                                 body->oa.o_flags = 0;
1536                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1537                                 cli->cl_cksum_type);
1538                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1539                 }
1540
1541                 /* Client cksum has been already copied to wire obdo in previous
1542                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1543                  * resent due to cksum error, this will allow Server to
1544                  * check+dump pages on its side */
1545         }
1546         ptlrpc_request_set_replen(req);
1547
1548         aa = ptlrpc_req_async_args(aa, req);
1549         aa->aa_oa = oa;
1550         aa->aa_requested_nob = requested_nob;
1551         aa->aa_nio_count = niocount;
1552         aa->aa_page_count = page_count;
1553         aa->aa_resends = 0;
1554         aa->aa_ppga = pga;
1555         aa->aa_cli = cli;
1556         INIT_LIST_HEAD(&aa->aa_oaps);
1557
1558         *reqp = req;
1559         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1560         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1561                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1562                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1563         RETURN(0);
1564
1565  out:
1566         ptlrpc_req_finished(req);
1567         RETURN(rc);
1568 }
1569
1570 char dbgcksum_file_name[PATH_MAX];
1571
1572 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1573                                 struct brw_page **pga, __u32 server_cksum,
1574                                 __u32 client_cksum)
1575 {
1576         struct file *filp;
1577         int rc, i;
1578         unsigned int len;
1579         char *buf;
1580
1581         /* will only keep dump of pages on first error for the same range in
1582          * file/fid, not during the resends/retries. */
1583         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1584                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1585                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1586                   libcfs_debug_file_path_arr :
1587                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1591                  pga[0]->off,
1592                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1593                  client_cksum, server_cksum);
1594         filp = filp_open(dbgcksum_file_name,
1595                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1596         if (IS_ERR(filp)) {
1597                 rc = PTR_ERR(filp);
1598                 if (rc == -EEXIST)
1599                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1600                                "checksum error: rc = %d\n", dbgcksum_file_name,
1601                                rc);
1602                 else
1603                         CERROR("%s: can't open to dump pages with checksum "
1604                                "error: rc = %d\n", dbgcksum_file_name, rc);
1605                 return;
1606         }
1607
1608         for (i = 0; i < page_count; i++) {
1609                 len = pga[i]->count;
1610                 buf = kmap(pga[i]->pg);
1611                 while (len != 0) {
1612                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1613                         if (rc < 0) {
1614                                 CERROR("%s: wanted to write %u but got %d "
1615                                        "error\n", dbgcksum_file_name, len, rc);
1616                                 break;
1617                         }
1618                         len -= rc;
1619                         buf += rc;
1620                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1621                                dbgcksum_file_name, rc);
1622                 }
1623                 kunmap(pga[i]->pg);
1624         }
1625
1626         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1627         if (rc)
1628                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1629         filp_close(filp, NULL);
1630         return;
1631 }
1632
1633 static int
1634 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1635                      __u32 client_cksum, __u32 server_cksum,
1636                      struct osc_brw_async_args *aa)
1637 {
1638         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1639         enum cksum_types cksum_type;
1640         obd_dif_csum_fn *fn = NULL;
1641         int sector_size = 0;
1642         __u32 new_cksum;
1643         char *msg;
1644         int rc;
1645
1646         if (server_cksum == client_cksum) {
1647                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1648                 return 0;
1649         }
1650
1651         if (aa->aa_cli->cl_checksum_dump)
1652                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1653                                     server_cksum, client_cksum);
1654
1655         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1656                                            oa->o_flags : 0);
1657
1658         switch (cksum_type) {
1659         case OBD_CKSUM_T10IP512:
1660                 fn = obd_dif_ip_fn;
1661                 sector_size = 512;
1662                 break;
1663         case OBD_CKSUM_T10IP4K:
1664                 fn = obd_dif_ip_fn;
1665                 sector_size = 4096;
1666                 break;
1667         case OBD_CKSUM_T10CRC512:
1668                 fn = obd_dif_crc_fn;
1669                 sector_size = 512;
1670                 break;
1671         case OBD_CKSUM_T10CRC4K:
1672                 fn = obd_dif_crc_fn;
1673                 sector_size = 4096;
1674                 break;
1675         default:
1676                 break;
1677         }
1678
1679         if (fn)
1680                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1681                                              aa->aa_page_count, aa->aa_ppga,
1682                                              OST_WRITE, fn, sector_size,
1683                                              &new_cksum);
1684         else
1685                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1686                                        aa->aa_ppga, OST_WRITE, cksum_type,
1687                                        &new_cksum);
1688
1689         if (rc < 0)
1690                 msg = "failed to calculate the client write checksum";
1691         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1692                 msg = "the server did not use the checksum type specified in "
1693                       "the original request - likely a protocol problem";
1694         else if (new_cksum == server_cksum)
1695                 msg = "changed on the client after we checksummed it - "
1696                       "likely false positive due to mmap IO (bug 11742)";
1697         else if (new_cksum == client_cksum)
1698                 msg = "changed in transit before arrival at OST";
1699         else
1700                 msg = "changed in transit AND doesn't match the original - "
1701                       "likely false positive due to mmap IO (bug 11742)";
1702
1703         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1704                            DFID " object "DOSTID" extent [%llu-%llu], original "
1705                            "client csum %x (type %x), server csum %x (type %x),"
1706                            " client csum now %x\n",
1707                            obd_name, msg, libcfs_nid2str(peer->nid),
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1711                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1712                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1713                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1714                            client_cksum,
1715                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1716                            server_cksum, cksum_type, new_cksum);
1717         return 1;
1718 }
1719
1720 /* Note rc enters this function as number of bytes transferred */
1721 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1722 {
1723         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1724         struct client_obd *cli = aa->aa_cli;
1725         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1726         const struct lnet_process_id *peer =
1727                 &req->rq_import->imp_connection->c_peer;
1728         struct ost_body *body;
1729         u32 client_cksum = 0;
1730         ENTRY;
1731
1732         if (rc < 0 && rc != -EDQUOT) {
1733                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1734                 RETURN(rc);
1735         }
1736
1737         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1738         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1739         if (body == NULL) {
1740                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1741                 RETURN(-EPROTO);
1742         }
1743
1744         /* set/clear over quota flag for a uid/gid/projid */
1745         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1746             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1747                 unsigned qid[LL_MAXQUOTAS] = {
1748                                          body->oa.o_uid, body->oa.o_gid,
1749                                          body->oa.o_projid };
1750                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1751                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1752                        body->oa.o_valid, body->oa.o_flags);
1753                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1754                                        body->oa.o_flags);
1755         }
1756
1757         osc_update_grant(cli, body);
1758
1759         if (rc < 0)
1760                 RETURN(rc);
1761
1762         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1763                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1764
1765         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1766                 if (rc > 0) {
1767                         CERROR("Unexpected +ve rc %d\n", rc);
1768                         RETURN(-EPROTO);
1769                 }
1770
1771                 if (req->rq_bulk != NULL &&
1772                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1773                         RETURN(-EAGAIN);
1774
1775                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1776                     check_write_checksum(&body->oa, peer, client_cksum,
1777                                          body->oa.o_cksum, aa))
1778                         RETURN(-EAGAIN);
1779
1780                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1781                                      aa->aa_page_count, aa->aa_ppga);
1782                 GOTO(out, rc);
1783         }
1784
1785         /* The rest of this function executes only for OST_READs */
1786
1787         if (req->rq_bulk == NULL) {
1788                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1789                                           RCL_SERVER);
1790                 LASSERT(rc == req->rq_status);
1791         } else {
1792                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1793                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1794         }
1795         if (rc < 0)
1796                 GOTO(out, rc = -EAGAIN);
1797
1798         if (rc > aa->aa_requested_nob) {
1799                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1800                        aa->aa_requested_nob);
1801                 RETURN(-EPROTO);
1802         }
1803
1804         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1805                 CERROR ("Unexpected rc %d (%d transferred)\n",
1806                         rc, req->rq_bulk->bd_nob_transferred);
1807                 return (-EPROTO);
1808         }
1809
1810         if (req->rq_bulk == NULL) {
1811                 /* short io */
1812                 int nob, pg_count, i = 0;
1813                 unsigned char *buf;
1814
1815                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1816                 pg_count = aa->aa_page_count;
1817                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1818                                                    rc);
1819                 nob = rc;
1820                 while (nob > 0 && pg_count > 0) {
1821                         unsigned char *ptr;
1822                         int count = aa->aa_ppga[i]->count > nob ?
1823                                     nob : aa->aa_ppga[i]->count;
1824
1825                         CDEBUG(D_CACHE, "page %p count %d\n",
1826                                aa->aa_ppga[i]->pg, count);
1827                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1828                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1829                                count);
1830                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1831
1832                         buf += count;
1833                         nob -= count;
1834                         i++;
1835                         pg_count--;
1836                 }
1837         }
1838
1839         if (rc < aa->aa_requested_nob)
1840                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1841
1842         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1843                 static int cksum_counter;
1844                 u32        server_cksum = body->oa.o_cksum;
1845                 char      *via = "";
1846                 char      *router = "";
1847                 enum cksum_types cksum_type;
1848                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1849                         body->oa.o_flags : 0;
1850
1851                 cksum_type = obd_cksum_type_unpack(o_flags);
1852                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1853                                           aa->aa_page_count, aa->aa_ppga,
1854                                           OST_READ, &client_cksum);
1855                 if (rc < 0)
1856                         GOTO(out, rc);
1857
1858                 if (req->rq_bulk != NULL &&
1859                     peer->nid != req->rq_bulk->bd_sender) {
1860                         via = " via ";
1861                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1862                 }
1863
1864                 if (server_cksum != client_cksum) {
1865                         struct ost_body *clbody;
1866                         u32 page_count = aa->aa_page_count;
1867
1868                         clbody = req_capsule_client_get(&req->rq_pill,
1869                                                         &RMF_OST_BODY);
1870                         if (cli->cl_checksum_dump)
1871                                 dump_all_bulk_pages(&clbody->oa, page_count,
1872                                                     aa->aa_ppga, server_cksum,
1873                                                     client_cksum);
1874
1875                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1876                                            "%s%s%s inode "DFID" object "DOSTID
1877                                            " extent [%llu-%llu], client %x, "
1878                                            "server %x, cksum_type %x\n",
1879                                            obd_name,
1880                                            libcfs_nid2str(peer->nid),
1881                                            via, router,
1882                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1883                                                 clbody->oa.o_parent_seq : 0ULL,
1884                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1885                                                 clbody->oa.o_parent_oid : 0,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_ver : 0,
1888                                            POSTID(&body->oa.o_oi),
1889                                            aa->aa_ppga[0]->off,
1890                                            aa->aa_ppga[page_count-1]->off +
1891                                            aa->aa_ppga[page_count-1]->count - 1,
1892                                            client_cksum, server_cksum,
1893                                            cksum_type);
1894                         cksum_counter = 0;
1895                         aa->aa_oa->o_cksum = client_cksum;
1896                         rc = -EAGAIN;
1897                 } else {
1898                         cksum_counter++;
1899                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1900                         rc = 0;
1901                 }
1902         } else if (unlikely(client_cksum)) {
1903                 static int cksum_missed;
1904
1905                 cksum_missed++;
1906                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1907                         CERROR("Checksum %u requested from %s but not sent\n",
1908                                cksum_missed, libcfs_nid2str(peer->nid));
1909         } else {
1910                 rc = 0;
1911         }
1912 out:
1913         if (rc >= 0)
1914                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1915                                      aa->aa_oa, &body->oa);
1916
1917         RETURN(rc);
1918 }
1919
1920 static int osc_brw_redo_request(struct ptlrpc_request *request,
1921                                 struct osc_brw_async_args *aa, int rc)
1922 {
1923         struct ptlrpc_request *new_req;
1924         struct osc_brw_async_args *new_aa;
1925         struct osc_async_page *oap;
1926         ENTRY;
1927
1928         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1929                   "redo for recoverable error %d", rc);
1930
1931         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1932                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1933                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1934                                   aa->aa_ppga, &new_req, 1);
1935         if (rc)
1936                 RETURN(rc);
1937
1938         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1939                 if (oap->oap_request != NULL) {
1940                         LASSERTF(request == oap->oap_request,
1941                                  "request %p != oap_request %p\n",
1942                                  request, oap->oap_request);
1943                         if (oap->oap_interrupted) {
1944                                 ptlrpc_req_finished(new_req);
1945                                 RETURN(-EINTR);
1946                         }
1947                 }
1948         }
1949         /*
1950          * New request takes over pga and oaps from old request.
1951          * Note that copying a list_head doesn't work, need to move it...
1952          */
1953         aa->aa_resends++;
1954         new_req->rq_interpret_reply = request->rq_interpret_reply;
1955         new_req->rq_async_args = request->rq_async_args;
1956         new_req->rq_commit_cb = request->rq_commit_cb;
1957         /* cap resend delay to the current request timeout, this is similar to
1958          * what ptlrpc does (see after_reply()) */
1959         if (aa->aa_resends > new_req->rq_timeout)
1960                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1961         else
1962                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1963         new_req->rq_generation_set = 1;
1964         new_req->rq_import_generation = request->rq_import_generation;
1965
1966         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1967
1968         INIT_LIST_HEAD(&new_aa->aa_oaps);
1969         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1970         INIT_LIST_HEAD(&new_aa->aa_exts);
1971         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1972         new_aa->aa_resends = aa->aa_resends;
1973
1974         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1975                 if (oap->oap_request) {
1976                         ptlrpc_req_finished(oap->oap_request);
1977                         oap->oap_request = ptlrpc_request_addref(new_req);
1978                 }
1979         }
1980
1981         /* XXX: This code will run into problem if we're going to support
1982          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1983          * and wait for all of them to be finished. We should inherit request
1984          * set from old request. */
1985         ptlrpcd_add_req(new_req);
1986
1987         DEBUG_REQ(D_INFO, new_req, "new request");
1988         RETURN(0);
1989 }
1990
1991 /*
1992  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1993  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1994  * fine for our small page arrays and doesn't require allocation.  its an
1995  * insertion sort that swaps elements that are strides apart, shrinking the
1996  * stride down until its '1' and the array is sorted.
1997  */
1998 static void sort_brw_pages(struct brw_page **array, int num)
1999 {
2000         int stride, i, j;
2001         struct brw_page *tmp;
2002
2003         if (num == 1)
2004                 return;
2005         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2006                 ;
2007
2008         do {
2009                 stride /= 3;
2010                 for (i = stride ; i < num ; i++) {
2011                         tmp = array[i];
2012                         j = i;
2013                         while (j >= stride && array[j - stride]->off > tmp->off) {
2014                                 array[j] = array[j - stride];
2015                                 j -= stride;
2016                         }
2017                         array[j] = tmp;
2018                 }
2019         } while (stride > 1);
2020 }
2021
2022 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2023 {
2024         LASSERT(ppga != NULL);
2025         OBD_FREE(ppga, sizeof(*ppga) * count);
2026 }
2027
2028 static int brw_interpret(const struct lu_env *env,
2029                          struct ptlrpc_request *req, void *args, int rc)
2030 {
2031         struct osc_brw_async_args *aa = args;
2032         struct osc_extent *ext;
2033         struct osc_extent *tmp;
2034         struct client_obd *cli = aa->aa_cli;
2035         unsigned long transferred = 0;
2036
2037         ENTRY;
2038
2039         rc = osc_brw_fini_request(req, rc);
2040         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2041         /*
2042          * When server returns -EINPROGRESS, client should always retry
2043          * regardless of the number of times the bulk was resent already.
2044          */
2045         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2046                 if (req->rq_import_generation !=
2047                     req->rq_import->imp_generation) {
2048                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2049                                ""DOSTID", rc = %d.\n",
2050                                req->rq_import->imp_obd->obd_name,
2051                                POSTID(&aa->aa_oa->o_oi), rc);
2052                 } else if (rc == -EINPROGRESS ||
2053                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2054                         rc = osc_brw_redo_request(req, aa, rc);
2055                 } else {
2056                         CERROR("%s: too many resent retries for object: "
2057                                "%llu:%llu, rc = %d.\n",
2058                                req->rq_import->imp_obd->obd_name,
2059                                POSTID(&aa->aa_oa->o_oi), rc);
2060                 }
2061
2062                 if (rc == 0)
2063                         RETURN(0);
2064                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2065                         rc = -EIO;
2066         }
2067
2068         if (rc == 0) {
2069                 struct obdo *oa = aa->aa_oa;
2070                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2071                 unsigned long valid = 0;
2072                 struct cl_object *obj;
2073                 struct osc_async_page *last;
2074
2075                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2076                 obj = osc2cl(last->oap_obj);
2077
2078                 cl_object_attr_lock(obj);
2079                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2080                         attr->cat_blocks = oa->o_blocks;
2081                         valid |= CAT_BLOCKS;
2082                 }
2083                 if (oa->o_valid & OBD_MD_FLMTIME) {
2084                         attr->cat_mtime = oa->o_mtime;
2085                         valid |= CAT_MTIME;
2086                 }
2087                 if (oa->o_valid & OBD_MD_FLATIME) {
2088                         attr->cat_atime = oa->o_atime;
2089                         valid |= CAT_ATIME;
2090                 }
2091                 if (oa->o_valid & OBD_MD_FLCTIME) {
2092                         attr->cat_ctime = oa->o_ctime;
2093                         valid |= CAT_CTIME;
2094                 }
2095
2096                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2097                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2098                         loff_t last_off = last->oap_count + last->oap_obj_off +
2099                                 last->oap_page_off;
2100
2101                         /* Change file size if this is an out of quota or
2102                          * direct IO write and it extends the file size */
2103                         if (loi->loi_lvb.lvb_size < last_off) {
2104                                 attr->cat_size = last_off;
2105                                 valid |= CAT_SIZE;
2106                         }
2107                         /* Extend KMS if it's not a lockless write */
2108                         if (loi->loi_kms < last_off &&
2109                             oap2osc_page(last)->ops_srvlock == 0) {
2110                                 attr->cat_kms = last_off;
2111                                 valid |= CAT_KMS;
2112                         }
2113                 }
2114
2115                 if (valid != 0)
2116                         cl_object_attr_update(env, obj, attr, valid);
2117                 cl_object_attr_unlock(obj);
2118         }
2119         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2120
2121         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2122                 osc_inc_unstable_pages(req);
2123
2124         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2125                 list_del_init(&ext->oe_link);
2126                 osc_extent_finish(env, ext, 1,
2127                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2128         }
2129         LASSERT(list_empty(&aa->aa_exts));
2130         LASSERT(list_empty(&aa->aa_oaps));
2131
2132         transferred = (req->rq_bulk == NULL ? /* short io */
2133                        aa->aa_requested_nob :
2134                        req->rq_bulk->bd_nob_transferred);
2135
2136         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2137         ptlrpc_lprocfs_brw(req, transferred);
2138
2139         spin_lock(&cli->cl_loi_list_lock);
2140         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2141          * is called so we know whether to go to sync BRWs or wait for more
2142          * RPCs to complete */
2143         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2144                 cli->cl_w_in_flight--;
2145         else
2146                 cli->cl_r_in_flight--;
2147         osc_wake_cache_waiters(cli);
2148         spin_unlock(&cli->cl_loi_list_lock);
2149
2150         osc_io_unplug(env, cli, NULL);
2151         RETURN(rc);
2152 }
2153
2154 static void brw_commit(struct ptlrpc_request *req)
2155 {
2156         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2157          * this called via the rq_commit_cb, I need to ensure
2158          * osc_dec_unstable_pages is still called. Otherwise unstable
2159          * pages may be leaked. */
2160         spin_lock(&req->rq_lock);
2161         if (likely(req->rq_unstable)) {
2162                 req->rq_unstable = 0;
2163                 spin_unlock(&req->rq_lock);
2164
2165                 osc_dec_unstable_pages(req);
2166         } else {
2167                 req->rq_committed = 1;
2168                 spin_unlock(&req->rq_lock);
2169         }
2170 }
2171
2172 /**
2173  * Build an RPC by the list of extent @ext_list. The caller must ensure
2174  * that the total pages in this list are NOT over max pages per RPC.
2175  * Extents in the list must be in OES_RPC state.
2176  */
2177 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2178                   struct list_head *ext_list, int cmd)
2179 {
2180         struct ptlrpc_request           *req = NULL;
2181         struct osc_extent               *ext;
2182         struct brw_page                 **pga = NULL;
2183         struct osc_brw_async_args       *aa = NULL;
2184         struct obdo                     *oa = NULL;
2185         struct osc_async_page           *oap;
2186         struct osc_object               *obj = NULL;
2187         struct cl_req_attr              *crattr = NULL;
2188         loff_t                          starting_offset = OBD_OBJECT_EOF;
2189         loff_t                          ending_offset = 0;
2190         int                             mpflag = 0;
2191         int                             mem_tight = 0;
2192         int                             page_count = 0;
2193         bool                            soft_sync = false;
2194         bool                            interrupted = false;
2195         bool                            ndelay = false;
2196         int                             i;
2197         int                             grant = 0;
2198         int                             rc;
2199         __u32                           layout_version = 0;
2200         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2201         struct ost_body                 *body;
2202         ENTRY;
2203         LASSERT(!list_empty(ext_list));
2204
2205         /* add pages into rpc_list to build BRW rpc */
2206         list_for_each_entry(ext, ext_list, oe_link) {
2207                 LASSERT(ext->oe_state == OES_RPC);
2208                 mem_tight |= ext->oe_memalloc;
2209                 grant += ext->oe_grants;
2210                 page_count += ext->oe_nr_pages;
2211                 layout_version = MAX(layout_version, ext->oe_layout_version);
2212                 if (obj == NULL)
2213                         obj = ext->oe_obj;
2214         }
2215
2216         soft_sync = osc_over_unstable_soft_limit(cli);
2217         if (mem_tight)
2218                 mpflag = cfs_memory_pressure_get_and_set();
2219
2220         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2221         if (pga == NULL)
2222                 GOTO(out, rc = -ENOMEM);
2223
2224         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2225         if (oa == NULL)
2226                 GOTO(out, rc = -ENOMEM);
2227
2228         i = 0;
2229         list_for_each_entry(ext, ext_list, oe_link) {
2230                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2231                         if (mem_tight)
2232                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2233                         if (soft_sync)
2234                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2235                         pga[i] = &oap->oap_brw_page;
2236                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2237                         i++;
2238
2239                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2240                         if (starting_offset == OBD_OBJECT_EOF ||
2241                             starting_offset > oap->oap_obj_off)
2242                                 starting_offset = oap->oap_obj_off;
2243                         else
2244                                 LASSERT(oap->oap_page_off == 0);
2245                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2246                                 ending_offset = oap->oap_obj_off +
2247                                                 oap->oap_count;
2248                         else
2249                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2250                                         PAGE_SIZE);
2251                         if (oap->oap_interrupted)
2252                                 interrupted = true;
2253                 }
2254                 if (ext->oe_ndelay)
2255                         ndelay = true;
2256         }
2257
2258         /* first page in the list */
2259         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2260
2261         crattr = &osc_env_info(env)->oti_req_attr;
2262         memset(crattr, 0, sizeof(*crattr));
2263         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2264         crattr->cra_flags = ~0ULL;
2265         crattr->cra_page = oap2cl_page(oap);
2266         crattr->cra_oa = oa;
2267         cl_req_attr_set(env, osc2cl(obj), crattr);
2268
2269         if (cmd == OBD_BRW_WRITE) {
2270                 oa->o_grant_used = grant;
2271                 if (layout_version > 0) {
2272                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2273                                PFID(&oa->o_oi.oi_fid), layout_version);
2274
2275                         oa->o_layout_version = layout_version;
2276                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2277                 }
2278         }
2279
2280         sort_brw_pages(pga, page_count);
2281         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2282         if (rc != 0) {
2283                 CERROR("prep_req failed: %d\n", rc);
2284                 GOTO(out, rc);
2285         }
2286
2287         req->rq_commit_cb = brw_commit;
2288         req->rq_interpret_reply = brw_interpret;
2289         req->rq_memalloc = mem_tight != 0;
2290         oap->oap_request = ptlrpc_request_addref(req);
2291         if (interrupted && !req->rq_intr)
2292                 ptlrpc_mark_interrupted(req);
2293         if (ndelay) {
2294                 req->rq_no_resend = req->rq_no_delay = 1;
2295                 /* probably set a shorter timeout value.
2296                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2297                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2298         }
2299
2300         /* Need to update the timestamps after the request is built in case
2301          * we race with setattr (locally or in queue at OST).  If OST gets
2302          * later setattr before earlier BRW (as determined by the request xid),
2303          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2304          * way to do this in a single call.  bug 10150 */
2305         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2306         crattr->cra_oa = &body->oa;
2307         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2308         cl_req_attr_set(env, osc2cl(obj), crattr);
2309         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2310
2311         aa = ptlrpc_req_async_args(aa, req);
2312         INIT_LIST_HEAD(&aa->aa_oaps);
2313         list_splice_init(&rpc_list, &aa->aa_oaps);
2314         INIT_LIST_HEAD(&aa->aa_exts);
2315         list_splice_init(ext_list, &aa->aa_exts);
2316
2317         spin_lock(&cli->cl_loi_list_lock);
2318         starting_offset >>= PAGE_SHIFT;
2319         if (cmd == OBD_BRW_READ) {
2320                 cli->cl_r_in_flight++;
2321                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2322                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2323                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2324                                       starting_offset + 1);
2325         } else {
2326                 cli->cl_w_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2330                                       starting_offset + 1);
2331         }
2332         spin_unlock(&cli->cl_loi_list_lock);
2333
2334         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2335                   page_count, aa, cli->cl_r_in_flight,
2336                   cli->cl_w_in_flight);
2337         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2338
2339         ptlrpcd_add_req(req);
2340         rc = 0;
2341         EXIT;
2342
2343 out:
2344         if (mem_tight != 0)
2345                 cfs_memory_pressure_restore(mpflag);
2346
2347         if (rc != 0) {
2348                 LASSERT(req == NULL);
2349
2350                 if (oa)
2351                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2352                 if (pga)
2353                         OBD_FREE(pga, sizeof(*pga) * page_count);
2354                 /* this should happen rarely and is pretty bad, it makes the
2355                  * pending list not follow the dirty order */
2356                 while (!list_empty(ext_list)) {
2357                         ext = list_entry(ext_list->next, struct osc_extent,
2358                                          oe_link);
2359                         list_del_init(&ext->oe_link);
2360                         osc_extent_finish(env, ext, 0, rc);
2361                 }
2362         }
2363         RETURN(rc);
2364 }
2365
2366 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2367 {
2368         int set = 0;
2369
2370         LASSERT(lock != NULL);
2371
2372         lock_res_and_lock(lock);
2373
2374         if (lock->l_ast_data == NULL)
2375                 lock->l_ast_data = data;
2376         if (lock->l_ast_data == data)
2377                 set = 1;
2378
2379         unlock_res_and_lock(lock);
2380
2381         return set;
2382 }
2383
2384 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2385                      void *cookie, struct lustre_handle *lockh,
2386                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2387                      int errcode)
2388 {
2389         bool intent = *flags & LDLM_FL_HAS_INTENT;
2390         int rc;
2391         ENTRY;
2392
2393         /* The request was created before ldlm_cli_enqueue call. */
2394         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2395                 struct ldlm_reply *rep;
2396
2397                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2398                 LASSERT(rep != NULL);
2399
2400                 rep->lock_policy_res1 =
2401                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2402                 if (rep->lock_policy_res1)
2403                         errcode = rep->lock_policy_res1;
2404                 if (!speculative)
2405                         *flags |= LDLM_FL_LVB_READY;
2406         } else if (errcode == ELDLM_OK) {
2407                 *flags |= LDLM_FL_LVB_READY;
2408         }
2409
2410         /* Call the update callback. */
2411         rc = (*upcall)(cookie, lockh, errcode);
2412
2413         /* release the reference taken in ldlm_cli_enqueue() */
2414         if (errcode == ELDLM_LOCK_MATCHED)
2415                 errcode = ELDLM_OK;
2416         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2417                 ldlm_lock_decref(lockh, mode);
2418
2419         RETURN(rc);
2420 }
2421
2422 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2423                           void *args, int rc)
2424 {
2425         struct osc_enqueue_args *aa = args;
2426         struct ldlm_lock *lock;
2427         struct lustre_handle *lockh = &aa->oa_lockh;
2428         enum ldlm_mode mode = aa->oa_mode;
2429         struct ost_lvb *lvb = aa->oa_lvb;
2430         __u32 lvb_len = sizeof(*lvb);
2431         __u64 flags = 0;
2432
2433         ENTRY;
2434
2435         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2436          * be valid. */
2437         lock = ldlm_handle2lock(lockh);
2438         LASSERTF(lock != NULL,
2439                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2440                  lockh->cookie, req, aa);
2441
2442         /* Take an additional reference so that a blocking AST that
2443          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2444          * to arrive after an upcall has been executed by
2445          * osc_enqueue_fini(). */
2446         ldlm_lock_addref(lockh, mode);
2447
2448         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2449         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2450
2451         /* Let CP AST to grant the lock first. */
2452         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2453
2454         if (aa->oa_speculative) {
2455                 LASSERT(aa->oa_lvb == NULL);
2456                 LASSERT(aa->oa_flags == NULL);
2457                 aa->oa_flags = &flags;
2458         }
2459
2460         /* Complete obtaining the lock procedure. */
2461         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2462                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2463                                    lockh, rc);
2464         /* Complete osc stuff. */
2465         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2466                               aa->oa_flags, aa->oa_speculative, rc);
2467
2468         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2469
2470         ldlm_lock_decref(lockh, mode);
2471         LDLM_LOCK_PUT(lock);
2472         RETURN(rc);
2473 }
2474
2475 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2476
2477 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2478  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2479  * other synchronous requests, however keeping some locks and trying to obtain
2480  * others may take a considerable amount of time in a case of ost failure; and
2481  * when other sync requests do not get released lock from a client, the client
2482  * is evicted from the cluster -- such scenarious make the life difficult, so
2483  * release locks just after they are obtained. */
2484 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2485                      __u64 *flags, union ldlm_policy_data *policy,
2486                      struct ost_lvb *lvb, int kms_valid,
2487                      osc_enqueue_upcall_f upcall, void *cookie,
2488                      struct ldlm_enqueue_info *einfo,
2489                      struct ptlrpc_request_set *rqset, int async,
2490                      bool speculative)
2491 {
2492         struct obd_device *obd = exp->exp_obd;
2493         struct lustre_handle lockh = { 0 };
2494         struct ptlrpc_request *req = NULL;
2495         int intent = *flags & LDLM_FL_HAS_INTENT;
2496         __u64 match_flags = *flags;
2497         enum ldlm_mode mode;
2498         int rc;
2499         ENTRY;
2500
2501         /* Filesystem lock extents are extended to page boundaries so that
2502          * dealing with the page cache is a little smoother.  */
2503         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2504         policy->l_extent.end |= ~PAGE_MASK;
2505
2506         /*
2507          * kms is not valid when either object is completely fresh (so that no
2508          * locks are cached), or object was evicted. In the latter case cached
2509          * lock cannot be used, because it would prime inode state with
2510          * potentially stale LVB.
2511          */
2512         if (!kms_valid)
2513                 goto no_match;
2514
2515         /* Next, search for already existing extent locks that will cover us */
2516         /* If we're trying to read, we also search for an existing PW lock.  The
2517          * VFS and page cache already protect us locally, so lots of readers/
2518          * writers can share a single PW lock.
2519          *
2520          * There are problems with conversion deadlocks, so instead of
2521          * converting a read lock to a write lock, we'll just enqueue a new
2522          * one.
2523          *
2524          * At some point we should cancel the read lock instead of making them
2525          * send us a blocking callback, but there are problems with canceling
2526          * locks out from other users right now, too. */
2527         mode = einfo->ei_mode;
2528         if (einfo->ei_mode == LCK_PR)
2529                 mode |= LCK_PW;
2530         /* Normal lock requests must wait for the LVB to be ready before
2531          * matching a lock; speculative lock requests do not need to,
2532          * because they will not actually use the lock. */
2533         if (!speculative)
2534                 match_flags |= LDLM_FL_LVB_READY;
2535         if (intent != 0)
2536                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2537         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2538                                einfo->ei_type, policy, mode, &lockh, 0);
2539         if (mode) {
2540                 struct ldlm_lock *matched;
2541
2542                 if (*flags & LDLM_FL_TEST_LOCK)
2543                         RETURN(ELDLM_OK);
2544
2545                 matched = ldlm_handle2lock(&lockh);
2546                 if (speculative) {
2547                         /* This DLM lock request is speculative, and does not
2548                          * have an associated IO request. Therefore if there
2549                          * is already a DLM lock, it wll just inform the
2550                          * caller to cancel the request for this stripe.*/
2551                         lock_res_and_lock(matched);
2552                         if (ldlm_extent_equal(&policy->l_extent,
2553                             &matched->l_policy_data.l_extent))
2554                                 rc = -EEXIST;
2555                         else
2556                                 rc = -ECANCELED;
2557                         unlock_res_and_lock(matched);
2558
2559                         ldlm_lock_decref(&lockh, mode);
2560                         LDLM_LOCK_PUT(matched);
2561                         RETURN(rc);
2562                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2563                         *flags |= LDLM_FL_LVB_READY;
2564
2565                         /* We already have a lock, and it's referenced. */
2566                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2567
2568                         ldlm_lock_decref(&lockh, mode);
2569                         LDLM_LOCK_PUT(matched);
2570                         RETURN(ELDLM_OK);
2571                 } else {
2572                         ldlm_lock_decref(&lockh, mode);
2573                         LDLM_LOCK_PUT(matched);
2574                 }
2575         }
2576
2577 no_match:
2578         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2579                 RETURN(-ENOLCK);
2580
2581         if (intent) {
2582                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2583                                            &RQF_LDLM_ENQUEUE_LVB);
2584                 if (req == NULL)
2585                         RETURN(-ENOMEM);
2586
2587                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2588                 if (rc) {
2589                         ptlrpc_request_free(req);
2590                         RETURN(rc);
2591                 }
2592
2593                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2594                                      sizeof *lvb);
2595                 ptlrpc_request_set_replen(req);
2596         }
2597
2598         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2599         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2600
2601         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2602                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2603         if (async) {
2604                 if (!rc) {
2605                         struct osc_enqueue_args *aa;
2606                         aa = ptlrpc_req_async_args(aa, req);
2607                         aa->oa_exp         = exp;
2608                         aa->oa_mode        = einfo->ei_mode;
2609                         aa->oa_type        = einfo->ei_type;
2610                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2611                         aa->oa_upcall      = upcall;
2612                         aa->oa_cookie      = cookie;
2613                         aa->oa_speculative = speculative;
2614                         if (!speculative) {
2615                                 aa->oa_flags  = flags;
2616                                 aa->oa_lvb    = lvb;
2617                         } else {
2618                                 /* speculative locks are essentially to enqueue
2619                                  * a DLM lock  in advance, so we don't care
2620                                  * about the result of the enqueue. */
2621                                 aa->oa_lvb    = NULL;
2622                                 aa->oa_flags  = NULL;
2623                         }
2624
2625                         req->rq_interpret_reply = osc_enqueue_interpret;
2626                         if (rqset == PTLRPCD_SET)
2627                                 ptlrpcd_add_req(req);
2628                         else
2629                                 ptlrpc_set_add_req(rqset, req);
2630                 } else if (intent) {
2631                         ptlrpc_req_finished(req);
2632                 }
2633                 RETURN(rc);
2634         }
2635
2636         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2637                               flags, speculative, rc);
2638         if (intent)
2639                 ptlrpc_req_finished(req);
2640
2641         RETURN(rc);
2642 }
2643
2644 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2645                    enum ldlm_type type, union ldlm_policy_data *policy,
2646                    enum ldlm_mode mode, __u64 *flags, void *data,
2647                    struct lustre_handle *lockh, int unref)
2648 {
2649         struct obd_device *obd = exp->exp_obd;
2650         __u64 lflags = *flags;
2651         enum ldlm_mode rc;
2652         ENTRY;
2653
2654         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2655                 RETURN(-EIO);
2656
2657         /* Filesystem lock extents are extended to page boundaries so that
2658          * dealing with the page cache is a little smoother */
2659         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2660         policy->l_extent.end |= ~PAGE_MASK;
2661
2662         /* Next, search for already existing extent locks that will cover us */
2663         /* If we're trying to read, we also search for an existing PW lock.  The
2664          * VFS and page cache already protect us locally, so lots of readers/
2665          * writers can share a single PW lock. */
2666         rc = mode;
2667         if (mode == LCK_PR)
2668                 rc |= LCK_PW;
2669         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2670                              res_id, type, policy, rc, lockh, unref);
2671         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2672                 RETURN(rc);
2673
2674         if (data != NULL) {
2675                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2676
2677                 LASSERT(lock != NULL);
2678                 if (!osc_set_lock_data(lock, data)) {
2679                         ldlm_lock_decref(lockh, rc);
2680                         rc = 0;
2681                 }
2682                 LDLM_LOCK_PUT(lock);
2683         }
2684         RETURN(rc);
2685 }
2686
2687 static int osc_statfs_interpret(const struct lu_env *env,
2688                                 struct ptlrpc_request *req, void *args, int rc)
2689 {
2690         struct osc_async_args *aa = args;
2691         struct obd_statfs *msfs;
2692
2693         ENTRY;
2694         if (rc == -EBADR)
2695                 /*
2696                  * The request has in fact never been sent due to issues at
2697                  * a higher level (LOV).  Exit immediately since the caller
2698                  * is aware of the problem and takes care of the clean up.
2699                  */
2700                 RETURN(rc);
2701
2702         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2703             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2704                 GOTO(out, rc = 0);
2705
2706         if (rc != 0)
2707                 GOTO(out, rc);
2708
2709         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2710         if (msfs == NULL)
2711                 GOTO(out, rc = -EPROTO);
2712
2713         *aa->aa_oi->oi_osfs = *msfs;
2714 out:
2715         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2716
2717         RETURN(rc);
2718 }
2719
2720 static int osc_statfs_async(struct obd_export *exp,
2721                             struct obd_info *oinfo, time64_t max_age,
2722                             struct ptlrpc_request_set *rqset)
2723 {
2724         struct obd_device     *obd = class_exp2obd(exp);
2725         struct ptlrpc_request *req;
2726         struct osc_async_args *aa;
2727         int rc;
2728         ENTRY;
2729
2730         if (obd->obd_osfs_age >= max_age) {
2731                 CDEBUG(D_SUPER,
2732                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2733                        obd->obd_name, &obd->obd_osfs,
2734                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2735                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2736                 spin_lock(&obd->obd_osfs_lock);
2737                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2738                 spin_unlock(&obd->obd_osfs_lock);
2739                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2740                 if (oinfo->oi_cb_up)
2741                         oinfo->oi_cb_up(oinfo, 0);
2742
2743                 RETURN(0);
2744         }
2745
2746         /* We could possibly pass max_age in the request (as an absolute
2747          * timestamp or a "seconds.usec ago") so the target can avoid doing
2748          * extra calls into the filesystem if that isn't necessary (e.g.
2749          * during mount that would help a bit).  Having relative timestamps
2750          * is not so great if request processing is slow, while absolute
2751          * timestamps are not ideal because they need time synchronization. */
2752         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2753         if (req == NULL)
2754                 RETURN(-ENOMEM);
2755
2756         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2757         if (rc) {
2758                 ptlrpc_request_free(req);
2759                 RETURN(rc);
2760         }
2761         ptlrpc_request_set_replen(req);
2762         req->rq_request_portal = OST_CREATE_PORTAL;
2763         ptlrpc_at_set_req_timeout(req);
2764
2765         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2766                 /* procfs requests not want stat in wait for avoid deadlock */
2767                 req->rq_no_resend = 1;
2768                 req->rq_no_delay = 1;
2769         }
2770
2771         req->rq_interpret_reply = osc_statfs_interpret;
2772         aa = ptlrpc_req_async_args(aa, req);
2773         aa->aa_oi = oinfo;
2774
2775         ptlrpc_set_add_req(rqset, req);
2776         RETURN(0);
2777 }
2778
2779 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2780                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2781 {
2782         struct obd_device     *obd = class_exp2obd(exp);
2783         struct obd_statfs     *msfs;
2784         struct ptlrpc_request *req;
2785         struct obd_import     *imp = NULL;
2786         int rc;
2787         ENTRY;
2788
2789
2790         /*Since the request might also come from lprocfs, so we need
2791          *sync this with client_disconnect_export Bug15684*/
2792         down_read(&obd->u.cli.cl_sem);
2793         if (obd->u.cli.cl_import)
2794                 imp = class_import_get(obd->u.cli.cl_import);
2795         up_read(&obd->u.cli.cl_sem);
2796         if (!imp)
2797                 RETURN(-ENODEV);
2798
2799         /* We could possibly pass max_age in the request (as an absolute
2800          * timestamp or a "seconds.usec ago") so the target can avoid doing
2801          * extra calls into the filesystem if that isn't necessary (e.g.
2802          * during mount that would help a bit).  Having relative timestamps
2803          * is not so great if request processing is slow, while absolute
2804          * timestamps are not ideal because they need time synchronization. */
2805         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2806
2807         class_import_put(imp);
2808
2809         if (req == NULL)
2810                 RETURN(-ENOMEM);
2811
2812         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2813         if (rc) {
2814                 ptlrpc_request_free(req);
2815                 RETURN(rc);
2816         }
2817         ptlrpc_request_set_replen(req);
2818         req->rq_request_portal = OST_CREATE_PORTAL;
2819         ptlrpc_at_set_req_timeout(req);
2820
2821         if (flags & OBD_STATFS_NODELAY) {
2822                 /* procfs requests not want stat in wait for avoid deadlock */
2823                 req->rq_no_resend = 1;
2824                 req->rq_no_delay = 1;
2825         }
2826
2827         rc = ptlrpc_queue_wait(req);
2828         if (rc)
2829                 GOTO(out, rc);
2830
2831         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2832         if (msfs == NULL)
2833                 GOTO(out, rc = -EPROTO);
2834
2835         *osfs = *msfs;
2836
2837         EXIT;
2838 out:
2839         ptlrpc_req_finished(req);
2840         return rc;
2841 }
2842
2843 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2844                          void *karg, void __user *uarg)
2845 {
2846         struct obd_device *obd = exp->exp_obd;
2847         struct obd_ioctl_data *data = karg;
2848         int rc = 0;
2849
2850         ENTRY;
2851         if (!try_module_get(THIS_MODULE)) {
2852                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2853                        module_name(THIS_MODULE));
2854                 return -EINVAL;
2855         }
2856         switch (cmd) {
2857         case OBD_IOC_CLIENT_RECOVER:
2858                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2859                                            data->ioc_inlbuf1, 0);
2860                 if (rc > 0)
2861                         rc = 0;
2862                 break;
2863         case IOC_OSC_SET_ACTIVE:
2864                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2865                                               data->ioc_offset);
2866                 break;
2867         default:
2868                 rc = -ENOTTY;
2869                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2870                        obd->obd_name, cmd, current_comm(), rc);
2871                 break;
2872         }
2873
2874         module_put(THIS_MODULE);
2875         return rc;
2876 }
2877
2878 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2879                        u32 keylen, void *key, u32 vallen, void *val,
2880                        struct ptlrpc_request_set *set)
2881 {
2882         struct ptlrpc_request *req;
2883         struct obd_device     *obd = exp->exp_obd;
2884         struct obd_import     *imp = class_exp2cliimp(exp);
2885         char                  *tmp;
2886         int                    rc;
2887         ENTRY;
2888
2889         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2890
2891         if (KEY_IS(KEY_CHECKSUM)) {
2892                 if (vallen != sizeof(int))
2893                         RETURN(-EINVAL);
2894                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2895                 RETURN(0);
2896         }
2897
2898         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2899                 sptlrpc_conf_client_adapt(obd);
2900                 RETURN(0);
2901         }
2902
2903         if (KEY_IS(KEY_FLUSH_CTX)) {
2904                 sptlrpc_import_flush_my_ctx(imp);
2905                 RETURN(0);
2906         }
2907
2908         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2909                 struct client_obd *cli = &obd->u.cli;
2910                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2911                 long target = *(long *)val;
2912
2913                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2914                 *(long *)val -= nr;
2915                 RETURN(0);
2916         }
2917
2918         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2919                 RETURN(-EINVAL);
2920
2921         /* We pass all other commands directly to OST. Since nobody calls osc
2922            methods directly and everybody is supposed to go through LOV, we
2923            assume lov checked invalid values for us.
2924            The only recognised values so far are evict_by_nid and mds_conn.
2925            Even if something bad goes through, we'd get a -EINVAL from OST
2926            anyway. */
2927
2928         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2929                                                 &RQF_OST_SET_GRANT_INFO :
2930                                                 &RQF_OBD_SET_INFO);
2931         if (req == NULL)
2932                 RETURN(-ENOMEM);
2933
2934         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2935                              RCL_CLIENT, keylen);
2936         if (!KEY_IS(KEY_GRANT_SHRINK))
2937                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2938                                      RCL_CLIENT, vallen);
2939         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2940         if (rc) {
2941                 ptlrpc_request_free(req);
2942                 RETURN(rc);
2943         }
2944
2945         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2946         memcpy(tmp, key, keylen);
2947         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2948                                                         &RMF_OST_BODY :
2949                                                         &RMF_SETINFO_VAL);
2950         memcpy(tmp, val, vallen);
2951
2952         if (KEY_IS(KEY_GRANT_SHRINK)) {
2953                 struct osc_grant_args *aa;
2954                 struct obdo *oa;
2955
2956                 aa = ptlrpc_req_async_args(aa, req);
2957                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2958                 if (!oa) {
2959                         ptlrpc_req_finished(req);
2960                         RETURN(-ENOMEM);
2961                 }
2962                 *oa = ((struct ost_body *)val)->oa;
2963                 aa->aa_oa = oa;
2964                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2965         }
2966
2967         ptlrpc_request_set_replen(req);
2968         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2969                 LASSERT(set != NULL);
2970                 ptlrpc_set_add_req(set, req);
2971                 ptlrpc_check_set(NULL, set);
2972         } else {
2973                 ptlrpcd_add_req(req);
2974         }
2975
2976         RETURN(0);
2977 }
2978 EXPORT_SYMBOL(osc_set_info_async);
2979
2980 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2981                   struct obd_device *obd, struct obd_uuid *cluuid,
2982                   struct obd_connect_data *data, void *localdata)
2983 {
2984         struct client_obd *cli = &obd->u.cli;
2985
2986         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2987                 long lost_grant;
2988                 long grant;
2989
2990                 spin_lock(&cli->cl_loi_list_lock);
2991                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2992                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2993                         /* restore ocd_grant_blkbits as client page bits */
2994                         data->ocd_grant_blkbits = PAGE_SHIFT;
2995                         grant += cli->cl_dirty_grant;
2996                 } else {
2997                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2998                 }
2999                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3000                 lost_grant = cli->cl_lost_grant;
3001                 cli->cl_lost_grant = 0;
3002                 spin_unlock(&cli->cl_loi_list_lock);
3003
3004                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3005                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3006                        data->ocd_version, data->ocd_grant, lost_grant);
3007         }
3008
3009         RETURN(0);
3010 }
3011 EXPORT_SYMBOL(osc_reconnect);
3012
3013 int osc_disconnect(struct obd_export *exp)
3014 {
3015         struct obd_device *obd = class_exp2obd(exp);
3016         int rc;
3017
3018         rc = client_disconnect_export(exp);
3019         /**
3020          * Initially we put del_shrink_grant before disconnect_export, but it
3021          * causes the following problem if setup (connect) and cleanup
3022          * (disconnect) are tangled together.
3023          *      connect p1                     disconnect p2
3024          *   ptlrpc_connect_import
3025          *     ...............               class_manual_cleanup
3026          *                                     osc_disconnect
3027          *                                     del_shrink_grant
3028          *   ptlrpc_connect_interrupt
3029          *     osc_init_grant
3030          *   add this client to shrink list
3031          *                                      cleanup_osc
3032          * Bang! grant shrink thread trigger the shrink. BUG18662
3033          */
3034         osc_del_grant_list(&obd->u.cli);
3035         return rc;
3036 }
3037 EXPORT_SYMBOL(osc_disconnect);
3038
3039 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3040                                  struct hlist_node *hnode, void *arg)
3041 {
3042         struct lu_env *env = arg;
3043         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3044         struct ldlm_lock *lock;
3045         struct osc_object *osc = NULL;
3046         ENTRY;
3047
3048         lock_res(res);
3049         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3050                 if (lock->l_ast_data != NULL && osc == NULL) {
3051                         osc = lock->l_ast_data;
3052                         cl_object_get(osc2cl(osc));
3053                 }
3054
3055                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3056                  * by the 2nd round of ldlm_namespace_clean() call in
3057                  * osc_import_event(). */
3058                 ldlm_clear_cleaned(lock);
3059         }
3060         unlock_res(res);
3061
3062         if (osc != NULL) {
3063                 osc_object_invalidate(env, osc);
3064                 cl_object_put(env, osc2cl(osc));
3065         }
3066
3067         RETURN(0);
3068 }
3069 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3070
3071 static int osc_import_event(struct obd_device *obd,
3072                             struct obd_import *imp,
3073                             enum obd_import_event event)
3074 {
3075         struct client_obd *cli;
3076         int rc = 0;
3077
3078         ENTRY;
3079         LASSERT(imp->imp_obd == obd);
3080
3081         switch (event) {
3082         case IMP_EVENT_DISCON: {
3083                 cli = &obd->u.cli;
3084                 spin_lock(&cli->cl_loi_list_lock);
3085                 cli->cl_avail_grant = 0;
3086                 cli->cl_lost_grant = 0;
3087                 spin_unlock(&cli->cl_loi_list_lock);
3088                 break;
3089         }
3090         case IMP_EVENT_INACTIVE: {
3091                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3092                 break;
3093         }
3094         case IMP_EVENT_INVALIDATE: {
3095                 struct ldlm_namespace *ns = obd->obd_namespace;
3096                 struct lu_env         *env;
3097                 __u16                  refcheck;
3098
3099                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3100
3101                 env = cl_env_get(&refcheck);
3102                 if (!IS_ERR(env)) {
3103                         osc_io_unplug(env, &obd->u.cli, NULL);
3104
3105                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3106                                                  osc_ldlm_resource_invalidate,
3107                                                  env, 0);
3108                         cl_env_put(env, &refcheck);
3109
3110                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3111                 } else
3112                         rc = PTR_ERR(env);
3113                 break;
3114         }
3115         case IMP_EVENT_ACTIVE: {
3116                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3117                 break;
3118         }
3119         case IMP_EVENT_OCD: {
3120                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3121
3122                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3123                         osc_init_grant(&obd->u.cli, ocd);
3124
3125                 /* See bug 7198 */
3126                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3127                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3128
3129                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3130                 break;
3131         }
3132         case IMP_EVENT_DEACTIVATE: {
3133                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3134                 break;
3135         }
3136         case IMP_EVENT_ACTIVATE: {
3137                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3138                 break;
3139         }
3140         default:
3141                 CERROR("Unknown import event %d\n", event);
3142                 LBUG();
3143         }
3144         RETURN(rc);
3145 }
3146
3147 /**
3148  * Determine whether the lock can be canceled before replaying the lock
3149  * during recovery, see bug16774 for detailed information.
3150  *
3151  * \retval zero the lock can't be canceled
3152  * \retval other ok to cancel
3153  */
3154 static int osc_cancel_weight(struct ldlm_lock *lock)
3155 {
3156         /*
3157          * Cancel all unused and granted extent lock.
3158          */
3159         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3160             ldlm_is_granted(lock) &&
3161             osc_ldlm_weigh_ast(lock) == 0)
3162                 RETURN(1);
3163
3164         RETURN(0);
3165 }
3166
3167 static int brw_queue_work(const struct lu_env *env, void *data)
3168 {
3169         struct client_obd *cli = data;
3170
3171         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3172
3173         osc_io_unplug(env, cli, NULL);
3174         RETURN(0);
3175 }
3176
3177 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3178 {
3179         struct client_obd *cli = &obd->u.cli;
3180         void *handler;
3181         int rc;
3182
3183         ENTRY;
3184
3185         rc = ptlrpcd_addref();
3186         if (rc)
3187                 RETURN(rc);
3188
3189         rc = client_obd_setup(obd, lcfg);
3190         if (rc)
3191                 GOTO(out_ptlrpcd, rc);
3192
3193
3194         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3195         if (IS_ERR(handler))
3196                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3197         cli->cl_writeback_work = handler;
3198
3199         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3200         if (IS_ERR(handler))
3201                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3202         cli->cl_lru_work = handler;
3203
3204         rc = osc_quota_setup(obd);
3205         if (rc)
3206                 GOTO(out_ptlrpcd_work, rc);
3207
3208         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3209         osc_update_next_shrink(cli);
3210
3211         RETURN(rc);
3212
3213 out_ptlrpcd_work:
3214         if (cli->cl_writeback_work != NULL) {
3215                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3216                 cli->cl_writeback_work = NULL;
3217         }
3218         if (cli->cl_lru_work != NULL) {
3219                 ptlrpcd_destroy_work(cli->cl_lru_work);
3220                 cli->cl_lru_work = NULL;
3221         }
3222         client_obd_cleanup(obd);
3223 out_ptlrpcd:
3224         ptlrpcd_decref();
3225         RETURN(rc);
3226 }
3227 EXPORT_SYMBOL(osc_setup_common);
3228
3229 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3230 {
3231         struct client_obd *cli = &obd->u.cli;
3232         int                adding;
3233         int                added;
3234         int                req_count;
3235         int                rc;
3236
3237         ENTRY;
3238
3239         rc = osc_setup_common(obd, lcfg);
3240         if (rc < 0)
3241                 RETURN(rc);
3242
3243         rc = osc_tunables_init(obd);
3244         if (rc)
3245                 RETURN(rc);
3246
3247         /*
3248          * We try to control the total number of requests with a upper limit
3249          * osc_reqpool_maxreqcount. There might be some race which will cause
3250          * over-limit allocation, but it is fine.
3251          */
3252         req_count = atomic_read(&osc_pool_req_count);
3253         if (req_count < osc_reqpool_maxreqcount) {
3254                 adding = cli->cl_max_rpcs_in_flight + 2;
3255                 if (req_count + adding > osc_reqpool_maxreqcount)
3256                         adding = osc_reqpool_maxreqcount - req_count;
3257
3258                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3259                 atomic_add(added, &osc_pool_req_count);
3260         }
3261
3262         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3263
3264         spin_lock(&osc_shrink_lock);
3265         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3266         spin_unlock(&osc_shrink_lock);
3267         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3268         cli->cl_import->imp_idle_debug = D_HA;
3269
3270         RETURN(0);
3271 }
3272
3273 int osc_precleanup_common(struct obd_device *obd)
3274 {
3275         struct client_obd *cli = &obd->u.cli;
3276         ENTRY;
3277
3278         /* LU-464
3279          * for echo client, export may be on zombie list, wait for
3280          * zombie thread to cull it, because cli.cl_import will be
3281          * cleared in client_disconnect_export():
3282          *   class_export_destroy() -> obd_cleanup() ->
3283          *   echo_device_free() -> echo_client_cleanup() ->
3284          *   obd_disconnect() -> osc_disconnect() ->
3285          *   client_disconnect_export()
3286          */
3287         obd_zombie_barrier();
3288         if (cli->cl_writeback_work) {
3289                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3290                 cli->cl_writeback_work = NULL;
3291         }
3292
3293         if (cli->cl_lru_work) {
3294                 ptlrpcd_destroy_work(cli->cl_lru_work);
3295                 cli->cl_lru_work = NULL;
3296         }
3297
3298         obd_cleanup_client_import(obd);
3299         RETURN(0);
3300 }
3301 EXPORT_SYMBOL(osc_precleanup_common);
3302
3303 static int osc_precleanup(struct obd_device *obd)
3304 {
3305         ENTRY;
3306
3307         osc_precleanup_common(obd);
3308
3309         ptlrpc_lprocfs_unregister_obd(obd);
3310         RETURN(0);
3311 }
3312
3313 int osc_cleanup_common(struct obd_device *obd)
3314 {
3315         struct client_obd *cli = &obd->u.cli;
3316         int rc;
3317
3318         ENTRY;
3319
3320         spin_lock(&osc_shrink_lock);
3321         list_del(&cli->cl_shrink_list);
3322         spin_unlock(&osc_shrink_lock);
3323
3324         /* lru cleanup */
3325         if (cli->cl_cache != NULL) {
3326                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3327                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3328                 list_del_init(&cli->cl_lru_osc);
3329                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3330                 cli->cl_lru_left = NULL;
3331                 cl_cache_decref(cli->cl_cache);
3332                 cli->cl_cache = NULL;
3333         }
3334
3335         /* free memory of osc quota cache */
3336         osc_quota_cleanup(obd);
3337
3338         rc = client_obd_cleanup(obd);
3339
3340         ptlrpcd_decref();
3341         RETURN(rc);
3342 }
3343 EXPORT_SYMBOL(osc_cleanup_common);
3344
3345 static struct obd_ops osc_obd_ops = {
3346         .o_owner                = THIS_MODULE,
3347         .o_setup                = osc_setup,
3348         .o_precleanup           = osc_precleanup,
3349         .o_cleanup              = osc_cleanup_common,
3350         .o_add_conn             = client_import_add_conn,
3351         .o_del_conn             = client_import_del_conn,
3352         .o_connect              = client_connect_import,
3353         .o_reconnect            = osc_reconnect,
3354         .o_disconnect           = osc_disconnect,
3355         .o_statfs               = osc_statfs,
3356         .o_statfs_async         = osc_statfs_async,
3357         .o_create               = osc_create,
3358         .o_destroy              = osc_destroy,
3359         .o_getattr              = osc_getattr,
3360         .o_setattr              = osc_setattr,
3361         .o_iocontrol            = osc_iocontrol,
3362         .o_set_info_async       = osc_set_info_async,
3363         .o_import_event         = osc_import_event,
3364         .o_quotactl             = osc_quotactl,
3365 };
3366
3367 static struct shrinker *osc_cache_shrinker;
3368 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3369 DEFINE_SPINLOCK(osc_shrink_lock);
3370
3371 #ifndef HAVE_SHRINKER_COUNT
3372 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3373 {
3374         struct shrink_control scv = {
3375                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3376                 .gfp_mask   = shrink_param(sc, gfp_mask)
3377         };
3378 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3379         struct shrinker *shrinker = NULL;
3380 #endif
3381
3382         (void)osc_cache_shrink_scan(shrinker, &scv);
3383
3384         return osc_cache_shrink_count(shrinker, &scv);
3385 }
3386 #endif
3387
3388 static int __init osc_init(void)
3389 {
3390         unsigned int reqpool_size;
3391         unsigned int reqsize;
3392         int rc;
3393         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3394                          osc_cache_shrink_count, osc_cache_shrink_scan);
3395         ENTRY;
3396
3397         /* print an address of _any_ initialized kernel symbol from this
3398          * module, to allow debugging with gdb that doesn't support data
3399          * symbols from modules.*/
3400         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3401
3402         rc = lu_kmem_init(osc_caches);
3403         if (rc)
3404                 RETURN(rc);
3405
3406         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3407                                  LUSTRE_OSC_NAME, &osc_device_type);
3408         if (rc)
3409                 GOTO(out_kmem, rc);
3410
3411         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3412
3413         /* This is obviously too much memory, only prevent overflow here */
3414         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3415                 GOTO(out_type, rc = -EINVAL);
3416
3417         reqpool_size = osc_reqpool_mem_max << 20;
3418
3419         reqsize = 1;
3420         while (reqsize < OST_IO_MAXREQSIZE)
3421                 reqsize = reqsize << 1;
3422
3423         /*
3424          * We don't enlarge the request count in OSC pool according to
3425          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3426          * tried after normal allocation failed. So a small OSC pool won't
3427          * cause much performance degression in most of cases.
3428          */
3429         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3430
3431         atomic_set(&osc_pool_req_count, 0);
3432         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3433                                           ptlrpc_add_rqs_to_pool);
3434
3435         if (osc_rq_pool == NULL)
3436                 GOTO(out_type, rc = -ENOMEM);
3437
3438         rc = osc_start_grant_work();
3439         if (rc != 0)
3440                 GOTO(out_req_pool, rc);
3441
3442         RETURN(rc);
3443
3444 out_req_pool:
3445         ptlrpc_free_rq_pool(osc_rq_pool);
3446 out_type:
3447         class_unregister_type(LUSTRE_OSC_NAME);
3448 out_kmem:
3449         lu_kmem_fini(osc_caches);
3450
3451         RETURN(rc);
3452 }
3453
3454 static void __exit osc_exit(void)
3455 {
3456         osc_stop_grant_work();
3457         remove_shrinker(osc_cache_shrinker);
3458         class_unregister_type(LUSTRE_OSC_NAME);
3459         lu_kmem_fini(osc_caches);
3460         ptlrpc_free_rq_pool(osc_rq_pool);
3461 }
3462
3463 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3464 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3465 MODULE_VERSION(LUSTRE_VERSION_STRING);
3466 MODULE_LICENSE("GPL");
3467
3468 module_init(osc_init);
3469 module_exit(osc_exit);