Whamcloud - gitweb
10fcc44ffb85fb7285a9f30359d438948e1841c0
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         LIST_HEAD(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
649                 CERROR("dirty %lu > dirty_max %lu\n",
650                        cli->cl_dirty_pages,
651                        cli->cl_dirty_max_pages);
652                 oa->o_undirty = 0;
653         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
654                             (long)(obd_max_dirty_pages + 1))) {
655                 /* The atomic_read() allowing the atomic_inc() are
656                  * not covered by a lock thus they may safely race and trip
657                  * this CERROR() unless we add in a small fudge factor (+1). */
658                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
659                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
660                        obd_max_dirty_pages);
661                 oa->o_undirty = 0;
662         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
663                             0x7fffffff)) {
664                 CERROR("dirty %lu - dirty_max %lu too big???\n",
665                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
666                 oa->o_undirty = 0;
667         } else {
668                 unsigned long nrpages;
669                 unsigned long undirty;
670
671                 nrpages = cli->cl_max_pages_per_rpc;
672                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
673                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
674                 undirty = nrpages << PAGE_SHIFT;
675                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
676                                  GRANT_PARAM)) {
677                         int nrextents;
678
679                         /* take extent tax into account when asking for more
680                          * grant space */
681                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
682                                      cli->cl_max_extent_pages;
683                         undirty += nrextents * cli->cl_grant_extent_tax;
684                 }
685                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
686                  * to add extent tax, etc.
687                  */
688                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
689                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
690         }
691         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
692         oa->o_dropped = cli->cl_lost_grant;
693         cli->cl_lost_grant = 0;
694         spin_unlock(&cli->cl_loi_list_lock);
695         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
696                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
697 }
698
699 void osc_update_next_shrink(struct client_obd *cli)
700 {
701         cli->cl_next_shrink_grant = ktime_get_seconds() +
702                                     cli->cl_grant_shrink_interval;
703
704         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
705                cli->cl_next_shrink_grant);
706 }
707
708 static void __osc_update_grant(struct client_obd *cli, u64 grant)
709 {
710         spin_lock(&cli->cl_loi_list_lock);
711         cli->cl_avail_grant += grant;
712         spin_unlock(&cli->cl_loi_list_lock);
713 }
714
715 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
716 {
717         if (body->oa.o_valid & OBD_MD_FLGRANT) {
718                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
719                 __osc_update_grant(cli, body->oa.o_grant);
720         }
721 }
722
723 /**
724  * grant thread data for shrinking space.
725  */
726 struct grant_thread_data {
727         struct list_head        gtd_clients;
728         struct mutex            gtd_mutex;
729         unsigned long           gtd_stopped:1;
730 };
731 static struct grant_thread_data client_gtd;
732
733 static int osc_shrink_grant_interpret(const struct lu_env *env,
734                                       struct ptlrpc_request *req,
735                                       void *args, int rc)
736 {
737         struct osc_grant_args *aa = args;
738         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
739         struct ost_body *body;
740
741         if (rc != 0) {
742                 __osc_update_grant(cli, aa->aa_oa->o_grant);
743                 GOTO(out, rc);
744         }
745
746         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
747         LASSERT(body);
748         osc_update_grant(cli, body);
749 out:
750         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
751         aa->aa_oa = NULL;
752
753         return rc;
754 }
755
756 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
757 {
758         spin_lock(&cli->cl_loi_list_lock);
759         oa->o_grant = cli->cl_avail_grant / 4;
760         cli->cl_avail_grant -= oa->o_grant;
761         spin_unlock(&cli->cl_loi_list_lock);
762         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
763                 oa->o_valid |= OBD_MD_FLFLAGS;
764                 oa->o_flags = 0;
765         }
766         oa->o_flags |= OBD_FL_SHRINK_GRANT;
767         osc_update_next_shrink(cli);
768 }
769
770 /* Shrink the current grant, either from some large amount to enough for a
771  * full set of in-flight RPCs, or if we have already shrunk to that limit
772  * then to enough for a single RPC.  This avoids keeping more grant than
773  * needed, and avoids shrinking the grant piecemeal. */
774 static int osc_shrink_grant(struct client_obd *cli)
775 {
776         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
777                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
778
779         spin_lock(&cli->cl_loi_list_lock);
780         if (cli->cl_avail_grant <= target_bytes)
781                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
782         spin_unlock(&cli->cl_loi_list_lock);
783
784         return osc_shrink_grant_to_target(cli, target_bytes);
785 }
786
787 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
788 {
789         int                     rc = 0;
790         struct ost_body        *body;
791         ENTRY;
792
793         spin_lock(&cli->cl_loi_list_lock);
794         /* Don't shrink if we are already above or below the desired limit
795          * We don't want to shrink below a single RPC, as that will negatively
796          * impact block allocation and long-term performance. */
797         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799
800         if (target_bytes >= cli->cl_avail_grant) {
801                 spin_unlock(&cli->cl_loi_list_lock);
802                 RETURN(0);
803         }
804         spin_unlock(&cli->cl_loi_list_lock);
805
806         OBD_ALLOC_PTR(body);
807         if (!body)
808                 RETURN(-ENOMEM);
809
810         osc_announce_cached(cli, &body->oa, 0);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (target_bytes >= cli->cl_avail_grant) {
814                 /* available grant has changed since target calculation */
815                 spin_unlock(&cli->cl_loi_list_lock);
816                 GOTO(out_free, rc = 0);
817         }
818         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
819         cli->cl_avail_grant = target_bytes;
820         spin_unlock(&cli->cl_loi_list_lock);
821         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
822                 body->oa.o_valid |= OBD_MD_FLFLAGS;
823                 body->oa.o_flags = 0;
824         }
825         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
826         osc_update_next_shrink(cli);
827
828         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
829                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
830                                 sizeof(*body), body, NULL);
831         if (rc != 0)
832                 __osc_update_grant(cli, body->oa.o_grant);
833 out_free:
834         OBD_FREE_PTR(body);
835         RETURN(rc);
836 }
837
838 static int osc_should_shrink_grant(struct client_obd *client)
839 {
840         time64_t next_shrink = client->cl_next_shrink_grant;
841
842         if (client->cl_import == NULL)
843                 return 0;
844
845         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
846             client->cl_import->imp_grant_shrink_disabled)
847                 return 0;
848
849         if (ktime_get_seconds() >= next_shrink - 5) {
850                 /* Get the current RPC size directly, instead of going via:
851                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
852                  * Keep comment here so that it can be found by searching. */
853                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
854
855                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
856                     client->cl_avail_grant > brw_size)
857                         return 1;
858                 else
859                         osc_update_next_shrink(client);
860         }
861         return 0;
862 }
863
864 #define GRANT_SHRINK_RPC_BATCH  100
865
866 static struct delayed_work work;
867
868 static void osc_grant_work_handler(struct work_struct *data)
869 {
870         struct client_obd *cli;
871         int rpc_sent;
872         bool init_next_shrink = true;
873         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
874
875         rpc_sent = 0;
876         mutex_lock(&client_gtd.gtd_mutex);
877         list_for_each_entry(cli, &client_gtd.gtd_clients,
878                             cl_grant_chain) {
879                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
880                     osc_should_shrink_grant(cli)) {
881                         osc_shrink_grant(cli);
882                         rpc_sent++;
883                 }
884
885                 if (!init_next_shrink) {
886                         if (cli->cl_next_shrink_grant < next_shrink &&
887                             cli->cl_next_shrink_grant > ktime_get_seconds())
888                                 next_shrink = cli->cl_next_shrink_grant;
889                 } else {
890                         init_next_shrink = false;
891                         next_shrink = cli->cl_next_shrink_grant;
892                 }
893         }
894         mutex_unlock(&client_gtd.gtd_mutex);
895
896         if (client_gtd.gtd_stopped == 1)
897                 return;
898
899         if (next_shrink > ktime_get_seconds()) {
900                 time64_t delay = next_shrink - ktime_get_seconds();
901
902                 schedule_delayed_work(&work, cfs_time_seconds(delay));
903         } else {
904                 schedule_work(&work.work);
905         }
906 }
907
908 void osc_schedule_grant_work(void)
909 {
910         cancel_delayed_work_sync(&work);
911         schedule_work(&work.work);
912 }
913
914 /**
915  * Start grant thread for returing grant to server for idle clients.
916  */
917 static int osc_start_grant_work(void)
918 {
919         client_gtd.gtd_stopped = 0;
920         mutex_init(&client_gtd.gtd_mutex);
921         INIT_LIST_HEAD(&client_gtd.gtd_clients);
922
923         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
924         schedule_work(&work.work);
925
926         return 0;
927 }
928
929 static void osc_stop_grant_work(void)
930 {
931         client_gtd.gtd_stopped = 1;
932         cancel_delayed_work_sync(&work);
933 }
934
935 static void osc_add_grant_list(struct client_obd *client)
936 {
937         mutex_lock(&client_gtd.gtd_mutex);
938         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
939         mutex_unlock(&client_gtd.gtd_mutex);
940 }
941
942 static void osc_del_grant_list(struct client_obd *client)
943 {
944         if (list_empty(&client->cl_grant_chain))
945                 return;
946
947         mutex_lock(&client_gtd.gtd_mutex);
948         list_del_init(&client->cl_grant_chain);
949         mutex_unlock(&client_gtd.gtd_mutex);
950 }
951
952 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
953 {
954         /*
955          * ocd_grant is the total grant amount we're expect to hold: if we've
956          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
957          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
958          * dirty.
959          *
960          * race is tolerable here: if we're evicted, but imp_state already
961          * left EVICTED state, then cl_dirty_pages must be 0 already.
962          */
963         spin_lock(&cli->cl_loi_list_lock);
964         cli->cl_avail_grant = ocd->ocd_grant;
965         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
966                 cli->cl_avail_grant -= cli->cl_reserved_grant;
967                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
968                         cli->cl_avail_grant -= cli->cl_dirty_grant;
969                 else
970                         cli->cl_avail_grant -=
971                                         cli->cl_dirty_pages << PAGE_SHIFT;
972         }
973
974         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
975                 u64 size;
976                 int chunk_mask;
977
978                 /* overhead for each extent insertion */
979                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
980                 /* determine the appropriate chunk size used by osc_extent. */
981                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
982                                           ocd->ocd_grant_blkbits);
983                 /* max_pages_per_rpc must be chunk aligned */
984                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
985                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
986                                              ~chunk_mask) & chunk_mask;
987                 /* determine maximum extent size, in #pages */
988                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
989                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
990                 if (cli->cl_max_extent_pages == 0)
991                         cli->cl_max_extent_pages = 1;
992         } else {
993                 cli->cl_grant_extent_tax = 0;
994                 cli->cl_chunkbits = PAGE_SHIFT;
995                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
996         }
997         spin_unlock(&cli->cl_loi_list_lock);
998
999         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1000                 "chunk bits: %d cl_max_extent_pages: %d\n",
1001                 cli_name(cli),
1002                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1003                 cli->cl_max_extent_pages);
1004
1005         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1006                 osc_add_grant_list(cli);
1007 }
1008 EXPORT_SYMBOL(osc_init_grant);
1009
1010 /* We assume that the reason this OSC got a short read is because it read
1011  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1012  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1013  * this stripe never got written at or beyond this stripe offset yet. */
1014 static void handle_short_read(int nob_read, size_t page_count,
1015                               struct brw_page **pga)
1016 {
1017         char *ptr;
1018         int i = 0;
1019
1020         /* skip bytes read OK */
1021         while (nob_read > 0) {
1022                 LASSERT (page_count > 0);
1023
1024                 if (pga[i]->count > nob_read) {
1025                         /* EOF inside this page */
1026                         ptr = kmap(pga[i]->pg) +
1027                                 (pga[i]->off & ~PAGE_MASK);
1028                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1029                         kunmap(pga[i]->pg);
1030                         page_count--;
1031                         i++;
1032                         break;
1033                 }
1034
1035                 nob_read -= pga[i]->count;
1036                 page_count--;
1037                 i++;
1038         }
1039
1040         /* zero remaining pages */
1041         while (page_count-- > 0) {
1042                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1043                 memset(ptr, 0, pga[i]->count);
1044                 kunmap(pga[i]->pg);
1045                 i++;
1046         }
1047 }
1048
1049 static int check_write_rcs(struct ptlrpc_request *req,
1050                            int requested_nob, int niocount,
1051                            size_t page_count, struct brw_page **pga)
1052 {
1053         int     i;
1054         __u32   *remote_rcs;
1055
1056         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1057                                                   sizeof(*remote_rcs) *
1058                                                   niocount);
1059         if (remote_rcs == NULL) {
1060                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1061                 return(-EPROTO);
1062         }
1063
1064         /* return error if any niobuf was in error */
1065         for (i = 0; i < niocount; i++) {
1066                 if ((int)remote_rcs[i] < 0) {
1067                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1068                                i, remote_rcs[i], req);
1069                         return remote_rcs[i];
1070                 }
1071
1072                 if (remote_rcs[i] != 0) {
1073                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1074                                 i, remote_rcs[i], req);
1075                         return(-EPROTO);
1076                 }
1077         }
1078         if (req->rq_bulk != NULL &&
1079             req->rq_bulk->bd_nob_transferred != requested_nob) {
1080                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1081                        req->rq_bulk->bd_nob_transferred, requested_nob);
1082                 return(-EPROTO);
1083         }
1084
1085         return (0);
1086 }
1087
1088 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1089 {
1090         if (p1->flag != p2->flag) {
1091                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1092                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1093                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1094
1095                 /* warn if we try to combine flags that we don't know to be
1096                  * safe to combine */
1097                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1098                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1099                               "report this at https://jira.whamcloud.com/\n",
1100                               p1->flag, p2->flag);
1101                 }
1102                 return 0;
1103         }
1104
1105         return (p1->off + p1->count == p2->off);
1106 }
1107
1108 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1109 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1110                                    size_t pg_count, struct brw_page **pga,
1111                                    int opc, obd_dif_csum_fn *fn,
1112                                    int sector_size,
1113                                    u32 *check_sum)
1114 {
1115         struct ahash_request *req;
1116         /* Used Adler as the default checksum type on top of DIF tags */
1117         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1118         struct page *__page;
1119         unsigned char *buffer;
1120         __u16 *guard_start;
1121         unsigned int bufsize;
1122         int guard_number;
1123         int used_number = 0;
1124         int used;
1125         u32 cksum;
1126         int rc = 0;
1127         int i = 0;
1128
1129         LASSERT(pg_count > 0);
1130
1131         __page = alloc_page(GFP_KERNEL);
1132         if (__page == NULL)
1133                 return -ENOMEM;
1134
1135         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1136         if (IS_ERR(req)) {
1137                 rc = PTR_ERR(req);
1138                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1139                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1140                 GOTO(out, rc);
1141         }
1142
1143         buffer = kmap(__page);
1144         guard_start = (__u16 *)buffer;
1145         guard_number = PAGE_SIZE / sizeof(*guard_start);
1146         while (nob > 0 && pg_count > 0) {
1147                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1148
1149                 /* corrupt the data before we compute the checksum, to
1150                  * simulate an OST->client data error */
1151                 if (unlikely(i == 0 && opc == OST_READ &&
1152                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1153                         unsigned char *ptr = kmap(pga[i]->pg);
1154                         int off = pga[i]->off & ~PAGE_MASK;
1155
1156                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1157                         kunmap(pga[i]->pg);
1158                 }
1159
1160                 /*
1161                  * The left guard number should be able to hold checksums of a
1162                  * whole page
1163                  */
1164                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1165                                                   pga[i]->off & ~PAGE_MASK,
1166                                                   count,
1167                                                   guard_start + used_number,
1168                                                   guard_number - used_number,
1169                                                   &used, sector_size,
1170                                                   fn);
1171                 if (rc)
1172                         break;
1173
1174                 used_number += used;
1175                 if (used_number == guard_number) {
1176                         cfs_crypto_hash_update_page(req, __page, 0,
1177                                 used_number * sizeof(*guard_start));
1178                         used_number = 0;
1179                 }
1180
1181                 nob -= pga[i]->count;
1182                 pg_count--;
1183                 i++;
1184         }
1185         kunmap(__page);
1186         if (rc)
1187                 GOTO(out, rc);
1188
1189         if (used_number != 0)
1190                 cfs_crypto_hash_update_page(req, __page, 0,
1191                         used_number * sizeof(*guard_start));
1192
1193         bufsize = sizeof(cksum);
1194         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1195
1196         /* For sending we only compute the wrong checksum instead
1197          * of corrupting the data so it is still correct on a redo */
1198         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1199                 cksum++;
1200
1201         *check_sum = cksum;
1202 out:
1203         __free_page(__page);
1204         return rc;
1205 }
1206 #else /* !CONFIG_CRC_T10DIF */
1207 #define obd_dif_ip_fn NULL
1208 #define obd_dif_crc_fn NULL
1209 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1210         -EOPNOTSUPP
1211 #endif /* CONFIG_CRC_T10DIF */
1212
1213 static int osc_checksum_bulk(int nob, size_t pg_count,
1214                              struct brw_page **pga, int opc,
1215                              enum cksum_types cksum_type,
1216                              u32 *cksum)
1217 {
1218         int                             i = 0;
1219         struct ahash_request           *req;
1220         unsigned int                    bufsize;
1221         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1222
1223         LASSERT(pg_count > 0);
1224
1225         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1226         if (IS_ERR(req)) {
1227                 CERROR("Unable to initialize checksum hash %s\n",
1228                        cfs_crypto_hash_name(cfs_alg));
1229                 return PTR_ERR(req);
1230         }
1231
1232         while (nob > 0 && pg_count > 0) {
1233                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1234
1235                 /* corrupt the data before we compute the checksum, to
1236                  * simulate an OST->client data error */
1237                 if (i == 0 && opc == OST_READ &&
1238                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1239                         unsigned char *ptr = kmap(pga[i]->pg);
1240                         int off = pga[i]->off & ~PAGE_MASK;
1241
1242                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1243                         kunmap(pga[i]->pg);
1244                 }
1245                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1246                                             pga[i]->off & ~PAGE_MASK,
1247                                             count);
1248                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1249                                (int)(pga[i]->off & ~PAGE_MASK));
1250
1251                 nob -= pga[i]->count;
1252                 pg_count--;
1253                 i++;
1254         }
1255
1256         bufsize = sizeof(*cksum);
1257         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1258
1259         /* For sending we only compute the wrong checksum instead
1260          * of corrupting the data so it is still correct on a redo */
1261         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1262                 (*cksum)++;
1263
1264         return 0;
1265 }
1266
1267 static int osc_checksum_bulk_rw(const char *obd_name,
1268                                 enum cksum_types cksum_type,
1269                                 int nob, size_t pg_count,
1270                                 struct brw_page **pga, int opc,
1271                                 u32 *check_sum)
1272 {
1273         obd_dif_csum_fn *fn = NULL;
1274         int sector_size = 0;
1275         int rc;
1276
1277         ENTRY;
1278         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1279
1280         if (fn)
1281                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1282                                              opc, fn, sector_size, check_sum);
1283         else
1284                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1285                                        check_sum);
1286
1287         RETURN(rc);
1288 }
1289
1290 static int
1291 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1292                      u32 page_count, struct brw_page **pga,
1293                      struct ptlrpc_request **reqp, int resend)
1294 {
1295         struct ptlrpc_request   *req;
1296         struct ptlrpc_bulk_desc *desc;
1297         struct ost_body         *body;
1298         struct obd_ioobj        *ioobj;
1299         struct niobuf_remote    *niobuf;
1300         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1301         struct osc_brw_async_args *aa;
1302         struct req_capsule      *pill;
1303         struct brw_page *pg_prev;
1304         void *short_io_buf;
1305         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1306
1307         ENTRY;
1308         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1309                 RETURN(-ENOMEM); /* Recoverable */
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1311                 RETURN(-EINVAL); /* Fatal */
1312
1313         if ((cmd & OBD_BRW_WRITE) != 0) {
1314                 opc = OST_WRITE;
1315                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1316                                                 osc_rq_pool,
1317                                                 &RQF_OST_BRW_WRITE);
1318         } else {
1319                 opc = OST_READ;
1320                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1321         }
1322         if (req == NULL)
1323                 RETURN(-ENOMEM);
1324
1325         for (niocount = i = 1; i < page_count; i++) {
1326                 if (!can_merge_pages(pga[i - 1], pga[i]))
1327                         niocount++;
1328         }
1329
1330         pill = &req->rq_pill;
1331         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1332                              sizeof(*ioobj));
1333         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1334                              niocount * sizeof(*niobuf));
1335
1336         for (i = 0; i < page_count; i++)
1337                 short_io_size += pga[i]->count;
1338
1339         /* Check if read/write is small enough to be a short io. */
1340         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1341             !imp_connect_shortio(cli->cl_import))
1342                 short_io_size = 0;
1343
1344         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1345                              opc == OST_READ ? 0 : short_io_size);
1346         if (opc == OST_READ)
1347                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1348                                      short_io_size);
1349
1350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1351         if (rc) {
1352                 ptlrpc_request_free(req);
1353                 RETURN(rc);
1354         }
1355         osc_set_io_portal(req);
1356
1357         ptlrpc_at_set_req_timeout(req);
1358         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1359          * retry logic */
1360         req->rq_no_retry_einprogress = 1;
1361
1362         if (short_io_size != 0) {
1363                 desc = NULL;
1364                 short_io_buf = NULL;
1365                 goto no_bulk;
1366         }
1367
1368         desc = ptlrpc_prep_bulk_imp(req, page_count,
1369                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1370                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1371                         PTLRPC_BULK_PUT_SINK) |
1372                         PTLRPC_BULK_BUF_KIOV,
1373                 OST_BULK_PORTAL,
1374                 &ptlrpc_bulk_kiov_pin_ops);
1375
1376         if (desc == NULL)
1377                 GOTO(out, rc = -ENOMEM);
1378         /* NB request now owns desc and will free it when it gets freed */
1379 no_bulk:
1380         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1381         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1382         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1383         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1384
1385         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1386
1387         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1388          * and from_kgid(), because they are asynchronous. Fortunately, variable
1389          * oa contains valid o_uid and o_gid in these two operations.
1390          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1391          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1392          * other process logic */
1393         body->oa.o_uid = oa->o_uid;
1394         body->oa.o_gid = oa->o_gid;
1395
1396         obdo_to_ioobj(oa, ioobj);
1397         ioobj->ioo_bufcnt = niocount;
1398         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1399          * that might be send for this request.  The actual number is decided
1400          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1401          * "max - 1" for old client compatibility sending "0", and also so the
1402          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1403         if (desc != NULL)
1404                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1405         else /* short io */
1406                 ioobj_max_brw_set(ioobj, 0);
1407
1408         if (short_io_size != 0) {
1409                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1410                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1411                         body->oa.o_flags = 0;
1412                 }
1413                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1414                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1415                        short_io_size);
1416                 if (opc == OST_WRITE) {
1417                         short_io_buf = req_capsule_client_get(pill,
1418                                                               &RMF_SHORT_IO);
1419                         LASSERT(short_io_buf != NULL);
1420                 }
1421         }
1422
1423         LASSERT(page_count > 0);
1424         pg_prev = pga[0];
1425         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1426                 struct brw_page *pg = pga[i];
1427                 int poff = pg->off & ~PAGE_MASK;
1428
1429                 LASSERT(pg->count > 0);
1430                 /* make sure there is no gap in the middle of page array */
1431                 LASSERTF(page_count == 1 ||
1432                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1433                           ergo(i > 0 && i < page_count - 1,
1434                                poff == 0 && pg->count == PAGE_SIZE)   &&
1435                           ergo(i == page_count - 1, poff == 0)),
1436                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1437                          i, page_count, pg, pg->off, pg->count);
1438                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1439                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1440                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1441                          i, page_count,
1442                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1443                          pg_prev->pg, page_private(pg_prev->pg),
1444                          pg_prev->pg->index, pg_prev->off);
1445                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1446                         (pg->flag & OBD_BRW_SRVLOCK));
1447                 if (short_io_size != 0 && opc == OST_WRITE) {
1448                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1449
1450                         LASSERT(short_io_size >= requested_nob + pg->count);
1451                         memcpy(short_io_buf + requested_nob,
1452                                ptr + poff,
1453                                pg->count);
1454                         ll_kunmap_atomic(ptr, KM_USER0);
1455                 } else if (short_io_size == 0) {
1456                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1457                                                          pg->count);
1458                 }
1459                 requested_nob += pg->count;
1460
1461                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1462                         niobuf--;
1463                         niobuf->rnb_len += pg->count;
1464                 } else {
1465                         niobuf->rnb_offset = pg->off;
1466                         niobuf->rnb_len    = pg->count;
1467                         niobuf->rnb_flags  = pg->flag;
1468                 }
1469                 pg_prev = pg;
1470         }
1471
1472         LASSERTF((void *)(niobuf - niocount) ==
1473                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1474                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1475                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1476
1477         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1478         if (resend) {
1479                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1480                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1481                         body->oa.o_flags = 0;
1482                 }
1483                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1484         }
1485
1486         if (osc_should_shrink_grant(cli))
1487                 osc_shrink_grant_local(cli, &body->oa);
1488
1489         /* size[REQ_REC_OFF] still sizeof (*body) */
1490         if (opc == OST_WRITE) {
1491                 if (cli->cl_checksum &&
1492                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1493                         /* store cl_cksum_type in a local variable since
1494                          * it can be changed via lprocfs */
1495                         enum cksum_types cksum_type = cli->cl_cksum_type;
1496
1497                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1498                                 body->oa.o_flags = 0;
1499
1500                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1501                                                                 cksum_type);
1502                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1503
1504                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1505                                                   requested_nob, page_count,
1506                                                   pga, OST_WRITE,
1507                                                   &body->oa.o_cksum);
1508                         if (rc < 0) {
1509                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1510                                        rc);
1511                                 GOTO(out, rc);
1512                         }
1513                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1514                                body->oa.o_cksum);
1515
1516                         /* save this in 'oa', too, for later checking */
1517                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1518                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1519                                                            cksum_type);
1520                 } else {
1521                         /* clear out the checksum flag, in case this is a
1522                          * resend but cl_checksum is no longer set. b=11238 */
1523                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1524                 }
1525                 oa->o_cksum = body->oa.o_cksum;
1526                 /* 1 RC per niobuf */
1527                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1528                                      sizeof(__u32) * niocount);
1529         } else {
1530                 if (cli->cl_checksum &&
1531                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1532                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1533                                 body->oa.o_flags = 0;
1534                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1535                                 cli->cl_cksum_type);
1536                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1537                 }
1538
1539                 /* Client cksum has been already copied to wire obdo in previous
1540                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1541                  * resent due to cksum error, this will allow Server to
1542                  * check+dump pages on its side */
1543         }
1544         ptlrpc_request_set_replen(req);
1545
1546         aa = ptlrpc_req_async_args(aa, req);
1547         aa->aa_oa = oa;
1548         aa->aa_requested_nob = requested_nob;
1549         aa->aa_nio_count = niocount;
1550         aa->aa_page_count = page_count;
1551         aa->aa_resends = 0;
1552         aa->aa_ppga = pga;
1553         aa->aa_cli = cli;
1554         INIT_LIST_HEAD(&aa->aa_oaps);
1555
1556         *reqp = req;
1557         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1558         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1559                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1560                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1561         RETURN(0);
1562
1563  out:
1564         ptlrpc_req_finished(req);
1565         RETURN(rc);
1566 }
1567
1568 char dbgcksum_file_name[PATH_MAX];
1569
1570 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1571                                 struct brw_page **pga, __u32 server_cksum,
1572                                 __u32 client_cksum)
1573 {
1574         struct file *filp;
1575         int rc, i;
1576         unsigned int len;
1577         char *buf;
1578
1579         /* will only keep dump of pages on first error for the same range in
1580          * file/fid, not during the resends/retries. */
1581         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1582                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1583                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1584                   libcfs_debug_file_path_arr :
1585                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1586                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1587                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1589                  pga[0]->off,
1590                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1591                  client_cksum, server_cksum);
1592         filp = filp_open(dbgcksum_file_name,
1593                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1594         if (IS_ERR(filp)) {
1595                 rc = PTR_ERR(filp);
1596                 if (rc == -EEXIST)
1597                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1598                                "checksum error: rc = %d\n", dbgcksum_file_name,
1599                                rc);
1600                 else
1601                         CERROR("%s: can't open to dump pages with checksum "
1602                                "error: rc = %d\n", dbgcksum_file_name, rc);
1603                 return;
1604         }
1605
1606         for (i = 0; i < page_count; i++) {
1607                 len = pga[i]->count;
1608                 buf = kmap(pga[i]->pg);
1609                 while (len != 0) {
1610                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1611                         if (rc < 0) {
1612                                 CERROR("%s: wanted to write %u but got %d "
1613                                        "error\n", dbgcksum_file_name, len, rc);
1614                                 break;
1615                         }
1616                         len -= rc;
1617                         buf += rc;
1618                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1619                                dbgcksum_file_name, rc);
1620                 }
1621                 kunmap(pga[i]->pg);
1622         }
1623
1624         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1625         if (rc)
1626                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1627         filp_close(filp, NULL);
1628 }
1629
1630 static int
1631 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1632                      __u32 client_cksum, __u32 server_cksum,
1633                      struct osc_brw_async_args *aa)
1634 {
1635         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1636         enum cksum_types cksum_type;
1637         obd_dif_csum_fn *fn = NULL;
1638         int sector_size = 0;
1639         __u32 new_cksum;
1640         char *msg;
1641         int rc;
1642
1643         if (server_cksum == client_cksum) {
1644                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1645                 return 0;
1646         }
1647
1648         if (aa->aa_cli->cl_checksum_dump)
1649                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1650                                     server_cksum, client_cksum);
1651
1652         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1653                                            oa->o_flags : 0);
1654
1655         switch (cksum_type) {
1656         case OBD_CKSUM_T10IP512:
1657                 fn = obd_dif_ip_fn;
1658                 sector_size = 512;
1659                 break;
1660         case OBD_CKSUM_T10IP4K:
1661                 fn = obd_dif_ip_fn;
1662                 sector_size = 4096;
1663                 break;
1664         case OBD_CKSUM_T10CRC512:
1665                 fn = obd_dif_crc_fn;
1666                 sector_size = 512;
1667                 break;
1668         case OBD_CKSUM_T10CRC4K:
1669                 fn = obd_dif_crc_fn;
1670                 sector_size = 4096;
1671                 break;
1672         default:
1673                 break;
1674         }
1675
1676         if (fn)
1677                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1678                                              aa->aa_page_count, aa->aa_ppga,
1679                                              OST_WRITE, fn, sector_size,
1680                                              &new_cksum);
1681         else
1682                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1683                                        aa->aa_ppga, OST_WRITE, cksum_type,
1684                                        &new_cksum);
1685
1686         if (rc < 0)
1687                 msg = "failed to calculate the client write checksum";
1688         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1689                 msg = "the server did not use the checksum type specified in "
1690                       "the original request - likely a protocol problem";
1691         else if (new_cksum == server_cksum)
1692                 msg = "changed on the client after we checksummed it - "
1693                       "likely false positive due to mmap IO (bug 11742)";
1694         else if (new_cksum == client_cksum)
1695                 msg = "changed in transit before arrival at OST";
1696         else
1697                 msg = "changed in transit AND doesn't match the original - "
1698                       "likely false positive due to mmap IO (bug 11742)";
1699
1700         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1701                            DFID " object "DOSTID" extent [%llu-%llu], original "
1702                            "client csum %x (type %x), server csum %x (type %x),"
1703                            " client csum now %x\n",
1704                            obd_name, msg, libcfs_nid2str(peer->nid),
1705                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1706                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1707                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1708                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1709                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1710                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1711                            client_cksum,
1712                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1713                            server_cksum, cksum_type, new_cksum);
1714         return 1;
1715 }
1716
1717 /* Note rc enters this function as number of bytes transferred */
1718 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1719 {
1720         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1721         struct client_obd *cli = aa->aa_cli;
1722         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1723         const struct lnet_process_id *peer =
1724                 &req->rq_import->imp_connection->c_peer;
1725         struct ost_body *body;
1726         u32 client_cksum = 0;
1727
1728         ENTRY;
1729
1730         if (rc < 0 && rc != -EDQUOT) {
1731                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1732                 RETURN(rc);
1733         }
1734
1735         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1736         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1737         if (body == NULL) {
1738                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1739                 RETURN(-EPROTO);
1740         }
1741
1742         /* set/clear over quota flag for a uid/gid/projid */
1743         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1744             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1745                 unsigned qid[LL_MAXQUOTAS] = {
1746                                          body->oa.o_uid, body->oa.o_gid,
1747                                          body->oa.o_projid };
1748                 CDEBUG(D_QUOTA,
1749                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1750                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1751                        body->oa.o_valid, body->oa.o_flags);
1752                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1753                                        body->oa.o_flags);
1754         }
1755
1756         osc_update_grant(cli, body);
1757
1758         if (rc < 0)
1759                 RETURN(rc);
1760
1761         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1762                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1763
1764         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1765                 if (rc > 0) {
1766                         CERROR("%s: unexpected positive size %d\n",
1767                                obd_name, rc);
1768                         RETURN(-EPROTO);
1769                 }
1770
1771                 if (req->rq_bulk != NULL &&
1772                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1773                         RETURN(-EAGAIN);
1774
1775                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1776                     check_write_checksum(&body->oa, peer, client_cksum,
1777                                          body->oa.o_cksum, aa))
1778                         RETURN(-EAGAIN);
1779
1780                 rc = check_write_rcs(req, aa->aa_requested_nob,
1781                                      aa->aa_nio_count, aa->aa_page_count,
1782                                      aa->aa_ppga);
1783                 GOTO(out, rc);
1784         }
1785
1786         /* The rest of this function executes only for OST_READs */
1787
1788         if (req->rq_bulk == NULL) {
1789                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1790                                           RCL_SERVER);
1791                 LASSERT(rc == req->rq_status);
1792         } else {
1793                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1794                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1795         }
1796         if (rc < 0)
1797                 GOTO(out, rc = -EAGAIN);
1798
1799         if (rc > aa->aa_requested_nob) {
1800                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1801                        rc, aa->aa_requested_nob);
1802                 RETURN(-EPROTO);
1803         }
1804
1805         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1806                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1807                        rc, req->rq_bulk->bd_nob_transferred);
1808                 RETURN(-EPROTO);
1809         }
1810
1811         if (req->rq_bulk == NULL) {
1812                 /* short io */
1813                 int nob, pg_count, i = 0;
1814                 unsigned char *buf;
1815
1816                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1817                 pg_count = aa->aa_page_count;
1818                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1819                                                    rc);
1820                 nob = rc;
1821                 while (nob > 0 && pg_count > 0) {
1822                         unsigned char *ptr;
1823                         int count = aa->aa_ppga[i]->count > nob ?
1824                                     nob : aa->aa_ppga[i]->count;
1825
1826                         CDEBUG(D_CACHE, "page %p count %d\n",
1827                                aa->aa_ppga[i]->pg, count);
1828                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1829                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1830                                count);
1831                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1832
1833                         buf += count;
1834                         nob -= count;
1835                         i++;
1836                         pg_count--;
1837                 }
1838         }
1839
1840         if (rc < aa->aa_requested_nob)
1841                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1842
1843         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1844                 static int cksum_counter;
1845                 u32        server_cksum = body->oa.o_cksum;
1846                 char      *via = "";
1847                 char      *router = "";
1848                 enum cksum_types cksum_type;
1849                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1850                         body->oa.o_flags : 0;
1851
1852                 cksum_type = obd_cksum_type_unpack(o_flags);
1853                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1854                                           aa->aa_page_count, aa->aa_ppga,
1855                                           OST_READ, &client_cksum);
1856                 if (rc < 0)
1857                         GOTO(out, rc);
1858
1859                 if (req->rq_bulk != NULL &&
1860                     peer->nid != req->rq_bulk->bd_sender) {
1861                         via = " via ";
1862                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1863                 }
1864
1865                 if (server_cksum != client_cksum) {
1866                         struct ost_body *clbody;
1867                         u32 page_count = aa->aa_page_count;
1868
1869                         clbody = req_capsule_client_get(&req->rq_pill,
1870                                                         &RMF_OST_BODY);
1871                         if (cli->cl_checksum_dump)
1872                                 dump_all_bulk_pages(&clbody->oa, page_count,
1873                                                     aa->aa_ppga, server_cksum,
1874                                                     client_cksum);
1875
1876                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1877                                            "%s%s%s inode "DFID" object "DOSTID
1878                                            " extent [%llu-%llu], client %x, "
1879                                            "server %x, cksum_type %x\n",
1880                                            obd_name,
1881                                            libcfs_nid2str(peer->nid),
1882                                            via, router,
1883                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1884                                                 clbody->oa.o_parent_seq : 0ULL,
1885                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1886                                                 clbody->oa.o_parent_oid : 0,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_ver : 0,
1889                                            POSTID(&body->oa.o_oi),
1890                                            aa->aa_ppga[0]->off,
1891                                            aa->aa_ppga[page_count-1]->off +
1892                                            aa->aa_ppga[page_count-1]->count - 1,
1893                                            client_cksum, server_cksum,
1894                                            cksum_type);
1895                         cksum_counter = 0;
1896                         aa->aa_oa->o_cksum = client_cksum;
1897                         rc = -EAGAIN;
1898                 } else {
1899                         cksum_counter++;
1900                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1901                         rc = 0;
1902                 }
1903         } else if (unlikely(client_cksum)) {
1904                 static int cksum_missed;
1905
1906                 cksum_missed++;
1907                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1908                         CERROR("%s: checksum %u requested from %s but not sent\n",
1909                                obd_name, cksum_missed,
1910                                libcfs_nid2str(peer->nid));
1911         } else {
1912                 rc = 0;
1913         }
1914 out:
1915         if (rc >= 0)
1916                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1917                                      aa->aa_oa, &body->oa);
1918
1919         RETURN(rc);
1920 }
1921
1922 static int osc_brw_redo_request(struct ptlrpc_request *request,
1923                                 struct osc_brw_async_args *aa, int rc)
1924 {
1925         struct ptlrpc_request *new_req;
1926         struct osc_brw_async_args *new_aa;
1927         struct osc_async_page *oap;
1928         ENTRY;
1929
1930         /* The below message is checked in replay-ost-single.sh test_8ae*/
1931         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1932                   "redo for recoverable error %d", rc);
1933
1934         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1935                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1936                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1937                                   aa->aa_ppga, &new_req, 1);
1938         if (rc)
1939                 RETURN(rc);
1940
1941         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1942                 if (oap->oap_request != NULL) {
1943                         LASSERTF(request == oap->oap_request,
1944                                  "request %p != oap_request %p\n",
1945                                  request, oap->oap_request);
1946                         if (oap->oap_interrupted) {
1947                                 ptlrpc_req_finished(new_req);
1948                                 RETURN(-EINTR);
1949                         }
1950                 }
1951         }
1952         /*
1953          * New request takes over pga and oaps from old request.
1954          * Note that copying a list_head doesn't work, need to move it...
1955          */
1956         aa->aa_resends++;
1957         new_req->rq_interpret_reply = request->rq_interpret_reply;
1958         new_req->rq_async_args = request->rq_async_args;
1959         new_req->rq_commit_cb = request->rq_commit_cb;
1960         /* cap resend delay to the current request timeout, this is similar to
1961          * what ptlrpc does (see after_reply()) */
1962         if (aa->aa_resends > new_req->rq_timeout)
1963                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1964         else
1965                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1966         new_req->rq_generation_set = 1;
1967         new_req->rq_import_generation = request->rq_import_generation;
1968
1969         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1970
1971         INIT_LIST_HEAD(&new_aa->aa_oaps);
1972         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1973         INIT_LIST_HEAD(&new_aa->aa_exts);
1974         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1975         new_aa->aa_resends = aa->aa_resends;
1976
1977         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1978                 if (oap->oap_request) {
1979                         ptlrpc_req_finished(oap->oap_request);
1980                         oap->oap_request = ptlrpc_request_addref(new_req);
1981                 }
1982         }
1983
1984         /* XXX: This code will run into problem if we're going to support
1985          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1986          * and wait for all of them to be finished. We should inherit request
1987          * set from old request. */
1988         ptlrpcd_add_req(new_req);
1989
1990         DEBUG_REQ(D_INFO, new_req, "new request");
1991         RETURN(0);
1992 }
1993
1994 /*
1995  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1996  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1997  * fine for our small page arrays and doesn't require allocation.  its an
1998  * insertion sort that swaps elements that are strides apart, shrinking the
1999  * stride down until its '1' and the array is sorted.
2000  */
2001 static void sort_brw_pages(struct brw_page **array, int num)
2002 {
2003         int stride, i, j;
2004         struct brw_page *tmp;
2005
2006         if (num == 1)
2007                 return;
2008         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2009                 ;
2010
2011         do {
2012                 stride /= 3;
2013                 for (i = stride ; i < num ; i++) {
2014                         tmp = array[i];
2015                         j = i;
2016                         while (j >= stride && array[j - stride]->off > tmp->off) {
2017                                 array[j] = array[j - stride];
2018                                 j -= stride;
2019                         }
2020                         array[j] = tmp;
2021                 }
2022         } while (stride > 1);
2023 }
2024
2025 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2026 {
2027         LASSERT(ppga != NULL);
2028         OBD_FREE(ppga, sizeof(*ppga) * count);
2029 }
2030
2031 static int brw_interpret(const struct lu_env *env,
2032                          struct ptlrpc_request *req, void *args, int rc)
2033 {
2034         struct osc_brw_async_args *aa = args;
2035         struct osc_extent *ext;
2036         struct osc_extent *tmp;
2037         struct client_obd *cli = aa->aa_cli;
2038         unsigned long transferred = 0;
2039
2040         ENTRY;
2041
2042         rc = osc_brw_fini_request(req, rc);
2043         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2044         /*
2045          * When server returns -EINPROGRESS, client should always retry
2046          * regardless of the number of times the bulk was resent already.
2047          */
2048         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2049                 if (req->rq_import_generation !=
2050                     req->rq_import->imp_generation) {
2051                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2052                                ""DOSTID", rc = %d.\n",
2053                                req->rq_import->imp_obd->obd_name,
2054                                POSTID(&aa->aa_oa->o_oi), rc);
2055                 } else if (rc == -EINPROGRESS ||
2056                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2057                         rc = osc_brw_redo_request(req, aa, rc);
2058                 } else {
2059                         CERROR("%s: too many resent retries for object: "
2060                                "%llu:%llu, rc = %d.\n",
2061                                req->rq_import->imp_obd->obd_name,
2062                                POSTID(&aa->aa_oa->o_oi), rc);
2063                 }
2064
2065                 if (rc == 0)
2066                         RETURN(0);
2067                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2068                         rc = -EIO;
2069         }
2070
2071         if (rc == 0) {
2072                 struct obdo *oa = aa->aa_oa;
2073                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2074                 unsigned long valid = 0;
2075                 struct cl_object *obj;
2076                 struct osc_async_page *last;
2077
2078                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2079                 obj = osc2cl(last->oap_obj);
2080
2081                 cl_object_attr_lock(obj);
2082                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2083                         attr->cat_blocks = oa->o_blocks;
2084                         valid |= CAT_BLOCKS;
2085                 }
2086                 if (oa->o_valid & OBD_MD_FLMTIME) {
2087                         attr->cat_mtime = oa->o_mtime;
2088                         valid |= CAT_MTIME;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLATIME) {
2091                         attr->cat_atime = oa->o_atime;
2092                         valid |= CAT_ATIME;
2093                 }
2094                 if (oa->o_valid & OBD_MD_FLCTIME) {
2095                         attr->cat_ctime = oa->o_ctime;
2096                         valid |= CAT_CTIME;
2097                 }
2098
2099                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2100                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2101                         loff_t last_off = last->oap_count + last->oap_obj_off +
2102                                 last->oap_page_off;
2103
2104                         /* Change file size if this is an out of quota or
2105                          * direct IO write and it extends the file size */
2106                         if (loi->loi_lvb.lvb_size < last_off) {
2107                                 attr->cat_size = last_off;
2108                                 valid |= CAT_SIZE;
2109                         }
2110                         /* Extend KMS if it's not a lockless write */
2111                         if (loi->loi_kms < last_off &&
2112                             oap2osc_page(last)->ops_srvlock == 0) {
2113                                 attr->cat_kms = last_off;
2114                                 valid |= CAT_KMS;
2115                         }
2116                 }
2117
2118                 if (valid != 0)
2119                         cl_object_attr_update(env, obj, attr, valid);
2120                 cl_object_attr_unlock(obj);
2121         }
2122         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2123         aa->aa_oa = NULL;
2124
2125         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2126                 osc_inc_unstable_pages(req);
2127
2128         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2129                 list_del_init(&ext->oe_link);
2130                 osc_extent_finish(env, ext, 1,
2131                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2132         }
2133         LASSERT(list_empty(&aa->aa_exts));
2134         LASSERT(list_empty(&aa->aa_oaps));
2135
2136         transferred = (req->rq_bulk == NULL ? /* short io */
2137                        aa->aa_requested_nob :
2138                        req->rq_bulk->bd_nob_transferred);
2139
2140         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2141         ptlrpc_lprocfs_brw(req, transferred);
2142
2143         spin_lock(&cli->cl_loi_list_lock);
2144         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2145          * is called so we know whether to go to sync BRWs or wait for more
2146          * RPCs to complete */
2147         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2148                 cli->cl_w_in_flight--;
2149         else
2150                 cli->cl_r_in_flight--;
2151         osc_wake_cache_waiters(cli);
2152         spin_unlock(&cli->cl_loi_list_lock);
2153
2154         osc_io_unplug(env, cli, NULL);
2155         RETURN(rc);
2156 }
2157
2158 static void brw_commit(struct ptlrpc_request *req)
2159 {
2160         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2161          * this called via the rq_commit_cb, I need to ensure
2162          * osc_dec_unstable_pages is still called. Otherwise unstable
2163          * pages may be leaked. */
2164         spin_lock(&req->rq_lock);
2165         if (likely(req->rq_unstable)) {
2166                 req->rq_unstable = 0;
2167                 spin_unlock(&req->rq_lock);
2168
2169                 osc_dec_unstable_pages(req);
2170         } else {
2171                 req->rq_committed = 1;
2172                 spin_unlock(&req->rq_lock);
2173         }
2174 }
2175
2176 /**
2177  * Build an RPC by the list of extent @ext_list. The caller must ensure
2178  * that the total pages in this list are NOT over max pages per RPC.
2179  * Extents in the list must be in OES_RPC state.
2180  */
2181 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2182                   struct list_head *ext_list, int cmd)
2183 {
2184         struct ptlrpc_request           *req = NULL;
2185         struct osc_extent               *ext;
2186         struct brw_page                 **pga = NULL;
2187         struct osc_brw_async_args       *aa = NULL;
2188         struct obdo                     *oa = NULL;
2189         struct osc_async_page           *oap;
2190         struct osc_object               *obj = NULL;
2191         struct cl_req_attr              *crattr = NULL;
2192         loff_t                          starting_offset = OBD_OBJECT_EOF;
2193         loff_t                          ending_offset = 0;
2194         int                             mpflag = 0;
2195         int                             mem_tight = 0;
2196         int                             page_count = 0;
2197         bool                            soft_sync = false;
2198         bool                            interrupted = false;
2199         bool                            ndelay = false;
2200         int                             i;
2201         int                             grant = 0;
2202         int                             rc;
2203         __u32                           layout_version = 0;
2204         LIST_HEAD(rpc_list);
2205         struct ost_body                 *body;
2206         ENTRY;
2207         LASSERT(!list_empty(ext_list));
2208
2209         /* add pages into rpc_list to build BRW rpc */
2210         list_for_each_entry(ext, ext_list, oe_link) {
2211                 LASSERT(ext->oe_state == OES_RPC);
2212                 mem_tight |= ext->oe_memalloc;
2213                 grant += ext->oe_grants;
2214                 page_count += ext->oe_nr_pages;
2215                 layout_version = MAX(layout_version, ext->oe_layout_version);
2216                 if (obj == NULL)
2217                         obj = ext->oe_obj;
2218         }
2219
2220         soft_sync = osc_over_unstable_soft_limit(cli);
2221         if (mem_tight)
2222                 mpflag = cfs_memory_pressure_get_and_set();
2223
2224         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2225         if (pga == NULL)
2226                 GOTO(out, rc = -ENOMEM);
2227
2228         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2229         if (oa == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         i = 0;
2233         list_for_each_entry(ext, ext_list, oe_link) {
2234                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2235                         if (mem_tight)
2236                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2237                         if (soft_sync)
2238                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2239                         pga[i] = &oap->oap_brw_page;
2240                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2241                         i++;
2242
2243                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2244                         if (starting_offset == OBD_OBJECT_EOF ||
2245                             starting_offset > oap->oap_obj_off)
2246                                 starting_offset = oap->oap_obj_off;
2247                         else
2248                                 LASSERT(oap->oap_page_off == 0);
2249                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2250                                 ending_offset = oap->oap_obj_off +
2251                                                 oap->oap_count;
2252                         else
2253                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2254                                         PAGE_SIZE);
2255                         if (oap->oap_interrupted)
2256                                 interrupted = true;
2257                 }
2258                 if (ext->oe_ndelay)
2259                         ndelay = true;
2260         }
2261
2262         /* first page in the list */
2263         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2264
2265         crattr = &osc_env_info(env)->oti_req_attr;
2266         memset(crattr, 0, sizeof(*crattr));
2267         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2268         crattr->cra_flags = ~0ULL;
2269         crattr->cra_page = oap2cl_page(oap);
2270         crattr->cra_oa = oa;
2271         cl_req_attr_set(env, osc2cl(obj), crattr);
2272
2273         if (cmd == OBD_BRW_WRITE) {
2274                 oa->o_grant_used = grant;
2275                 if (layout_version > 0) {
2276                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2277                                PFID(&oa->o_oi.oi_fid), layout_version);
2278
2279                         oa->o_layout_version = layout_version;
2280                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2281                 }
2282         }
2283
2284         sort_brw_pages(pga, page_count);
2285         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2286         if (rc != 0) {
2287                 CERROR("prep_req failed: %d\n", rc);
2288                 GOTO(out, rc);
2289         }
2290
2291         req->rq_commit_cb = brw_commit;
2292         req->rq_interpret_reply = brw_interpret;
2293         req->rq_memalloc = mem_tight != 0;
2294         oap->oap_request = ptlrpc_request_addref(req);
2295         if (interrupted && !req->rq_intr)
2296                 ptlrpc_mark_interrupted(req);
2297         if (ndelay) {
2298                 req->rq_no_resend = req->rq_no_delay = 1;
2299                 /* probably set a shorter timeout value.
2300                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2301                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2302         }
2303
2304         /* Need to update the timestamps after the request is built in case
2305          * we race with setattr (locally or in queue at OST).  If OST gets
2306          * later setattr before earlier BRW (as determined by the request xid),
2307          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2308          * way to do this in a single call.  bug 10150 */
2309         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2310         crattr->cra_oa = &body->oa;
2311         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2312         cl_req_attr_set(env, osc2cl(obj), crattr);
2313         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2314
2315         aa = ptlrpc_req_async_args(aa, req);
2316         INIT_LIST_HEAD(&aa->aa_oaps);
2317         list_splice_init(&rpc_list, &aa->aa_oaps);
2318         INIT_LIST_HEAD(&aa->aa_exts);
2319         list_splice_init(ext_list, &aa->aa_exts);
2320
2321         spin_lock(&cli->cl_loi_list_lock);
2322         starting_offset >>= PAGE_SHIFT;
2323         if (cmd == OBD_BRW_READ) {
2324                 cli->cl_r_in_flight++;
2325                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2326                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2327                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2328                                       starting_offset + 1);
2329         } else {
2330                 cli->cl_w_in_flight++;
2331                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2332                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2333                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2334                                       starting_offset + 1);
2335         }
2336         spin_unlock(&cli->cl_loi_list_lock);
2337
2338         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2339                   page_count, aa, cli->cl_r_in_flight,
2340                   cli->cl_w_in_flight);
2341         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2342
2343         ptlrpcd_add_req(req);
2344         rc = 0;
2345         EXIT;
2346
2347 out:
2348         if (mem_tight != 0)
2349                 cfs_memory_pressure_restore(mpflag);
2350
2351         if (rc != 0) {
2352                 LASSERT(req == NULL);
2353
2354                 if (oa)
2355                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2356                 if (pga)
2357                         OBD_FREE(pga, sizeof(*pga) * page_count);
2358                 /* this should happen rarely and is pretty bad, it makes the
2359                  * pending list not follow the dirty order */
2360                 while (!list_empty(ext_list)) {
2361                         ext = list_entry(ext_list->next, struct osc_extent,
2362                                          oe_link);
2363                         list_del_init(&ext->oe_link);
2364                         osc_extent_finish(env, ext, 0, rc);
2365                 }
2366         }
2367         RETURN(rc);
2368 }
2369
2370 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2371 {
2372         int set = 0;
2373
2374         LASSERT(lock != NULL);
2375
2376         lock_res_and_lock(lock);
2377
2378         if (lock->l_ast_data == NULL)
2379                 lock->l_ast_data = data;
2380         if (lock->l_ast_data == data)
2381                 set = 1;
2382
2383         unlock_res_and_lock(lock);
2384
2385         return set;
2386 }
2387
2388 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2389                      void *cookie, struct lustre_handle *lockh,
2390                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2391                      int errcode)
2392 {
2393         bool intent = *flags & LDLM_FL_HAS_INTENT;
2394         int rc;
2395         ENTRY;
2396
2397         /* The request was created before ldlm_cli_enqueue call. */
2398         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2399                 struct ldlm_reply *rep;
2400
2401                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2402                 LASSERT(rep != NULL);
2403
2404                 rep->lock_policy_res1 =
2405                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2406                 if (rep->lock_policy_res1)
2407                         errcode = rep->lock_policy_res1;
2408                 if (!speculative)
2409                         *flags |= LDLM_FL_LVB_READY;
2410         } else if (errcode == ELDLM_OK) {
2411                 *flags |= LDLM_FL_LVB_READY;
2412         }
2413
2414         /* Call the update callback. */
2415         rc = (*upcall)(cookie, lockh, errcode);
2416
2417         /* release the reference taken in ldlm_cli_enqueue() */
2418         if (errcode == ELDLM_LOCK_MATCHED)
2419                 errcode = ELDLM_OK;
2420         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2421                 ldlm_lock_decref(lockh, mode);
2422
2423         RETURN(rc);
2424 }
2425
2426 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2427                           void *args, int rc)
2428 {
2429         struct osc_enqueue_args *aa = args;
2430         struct ldlm_lock *lock;
2431         struct lustre_handle *lockh = &aa->oa_lockh;
2432         enum ldlm_mode mode = aa->oa_mode;
2433         struct ost_lvb *lvb = aa->oa_lvb;
2434         __u32 lvb_len = sizeof(*lvb);
2435         __u64 flags = 0;
2436
2437         ENTRY;
2438
2439         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2440          * be valid. */
2441         lock = ldlm_handle2lock(lockh);
2442         LASSERTF(lock != NULL,
2443                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2444                  lockh->cookie, req, aa);
2445
2446         /* Take an additional reference so that a blocking AST that
2447          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2448          * to arrive after an upcall has been executed by
2449          * osc_enqueue_fini(). */
2450         ldlm_lock_addref(lockh, mode);
2451
2452         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2453         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2454
2455         /* Let CP AST to grant the lock first. */
2456         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2457
2458         if (aa->oa_speculative) {
2459                 LASSERT(aa->oa_lvb == NULL);
2460                 LASSERT(aa->oa_flags == NULL);
2461                 aa->oa_flags = &flags;
2462         }
2463
2464         /* Complete obtaining the lock procedure. */
2465         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2466                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2467                                    lockh, rc);
2468         /* Complete osc stuff. */
2469         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2470                               aa->oa_flags, aa->oa_speculative, rc);
2471
2472         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2473
2474         ldlm_lock_decref(lockh, mode);
2475         LDLM_LOCK_PUT(lock);
2476         RETURN(rc);
2477 }
2478
2479 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2480
2481 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2482  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2483  * other synchronous requests, however keeping some locks and trying to obtain
2484  * others may take a considerable amount of time in a case of ost failure; and
2485  * when other sync requests do not get released lock from a client, the client
2486  * is evicted from the cluster -- such scenarious make the life difficult, so
2487  * release locks just after they are obtained. */
2488 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2489                      __u64 *flags, union ldlm_policy_data *policy,
2490                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2491                      void *cookie, struct ldlm_enqueue_info *einfo,
2492                      struct ptlrpc_request_set *rqset, int async,
2493                      bool speculative)
2494 {
2495         struct obd_device *obd = exp->exp_obd;
2496         struct lustre_handle lockh = { 0 };
2497         struct ptlrpc_request *req = NULL;
2498         int intent = *flags & LDLM_FL_HAS_INTENT;
2499         __u64 match_flags = *flags;
2500         enum ldlm_mode mode;
2501         int rc;
2502         ENTRY;
2503
2504         /* Filesystem lock extents are extended to page boundaries so that
2505          * dealing with the page cache is a little smoother.  */
2506         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2507         policy->l_extent.end |= ~PAGE_MASK;
2508
2509         /* Next, search for already existing extent locks that will cover us */
2510         /* If we're trying to read, we also search for an existing PW lock.  The
2511          * VFS and page cache already protect us locally, so lots of readers/
2512          * writers can share a single PW lock.
2513          *
2514          * There are problems with conversion deadlocks, so instead of
2515          * converting a read lock to a write lock, we'll just enqueue a new
2516          * one.
2517          *
2518          * At some point we should cancel the read lock instead of making them
2519          * send us a blocking callback, but there are problems with canceling
2520          * locks out from other users right now, too. */
2521         mode = einfo->ei_mode;
2522         if (einfo->ei_mode == LCK_PR)
2523                 mode |= LCK_PW;
2524         /* Normal lock requests must wait for the LVB to be ready before
2525          * matching a lock; speculative lock requests do not need to,
2526          * because they will not actually use the lock. */
2527         if (!speculative)
2528                 match_flags |= LDLM_FL_LVB_READY;
2529         if (intent != 0)
2530                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2531         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2532                                einfo->ei_type, policy, mode, &lockh, 0);
2533         if (mode) {
2534                 struct ldlm_lock *matched;
2535
2536                 if (*flags & LDLM_FL_TEST_LOCK)
2537                         RETURN(ELDLM_OK);
2538
2539                 matched = ldlm_handle2lock(&lockh);
2540                 if (speculative) {
2541                         /* This DLM lock request is speculative, and does not
2542                          * have an associated IO request. Therefore if there
2543                          * is already a DLM lock, it wll just inform the
2544                          * caller to cancel the request for this stripe.*/
2545                         lock_res_and_lock(matched);
2546                         if (ldlm_extent_equal(&policy->l_extent,
2547                             &matched->l_policy_data.l_extent))
2548                                 rc = -EEXIST;
2549                         else
2550                                 rc = -ECANCELED;
2551                         unlock_res_and_lock(matched);
2552
2553                         ldlm_lock_decref(&lockh, mode);
2554                         LDLM_LOCK_PUT(matched);
2555                         RETURN(rc);
2556                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2557                         *flags |= LDLM_FL_LVB_READY;
2558
2559                         /* We already have a lock, and it's referenced. */
2560                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2561
2562                         ldlm_lock_decref(&lockh, mode);
2563                         LDLM_LOCK_PUT(matched);
2564                         RETURN(ELDLM_OK);
2565                 } else {
2566                         ldlm_lock_decref(&lockh, mode);
2567                         LDLM_LOCK_PUT(matched);
2568                 }
2569         }
2570
2571         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2572                 RETURN(-ENOLCK);
2573
2574         if (intent) {
2575                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2576                                            &RQF_LDLM_ENQUEUE_LVB);
2577                 if (req == NULL)
2578                         RETURN(-ENOMEM);
2579
2580                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2581                 if (rc) {
2582                         ptlrpc_request_free(req);
2583                         RETURN(rc);
2584                 }
2585
2586                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2587                                      sizeof *lvb);
2588                 ptlrpc_request_set_replen(req);
2589         }
2590
2591         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2592         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2593
2594         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2595                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2596         if (async) {
2597                 if (!rc) {
2598                         struct osc_enqueue_args *aa;
2599                         aa = ptlrpc_req_async_args(aa, req);
2600                         aa->oa_exp         = exp;
2601                         aa->oa_mode        = einfo->ei_mode;
2602                         aa->oa_type        = einfo->ei_type;
2603                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2604                         aa->oa_upcall      = upcall;
2605                         aa->oa_cookie      = cookie;
2606                         aa->oa_speculative = speculative;
2607                         if (!speculative) {
2608                                 aa->oa_flags  = flags;
2609                                 aa->oa_lvb    = lvb;
2610                         } else {
2611                                 /* speculative locks are essentially to enqueue
2612                                  * a DLM lock  in advance, so we don't care
2613                                  * about the result of the enqueue. */
2614                                 aa->oa_lvb    = NULL;
2615                                 aa->oa_flags  = NULL;
2616                         }
2617
2618                         req->rq_interpret_reply = osc_enqueue_interpret;
2619                         if (rqset == PTLRPCD_SET)
2620                                 ptlrpcd_add_req(req);
2621                         else
2622                                 ptlrpc_set_add_req(rqset, req);
2623                 } else if (intent) {
2624                         ptlrpc_req_finished(req);
2625                 }
2626                 RETURN(rc);
2627         }
2628
2629         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2630                               flags, speculative, rc);
2631         if (intent)
2632                 ptlrpc_req_finished(req);
2633
2634         RETURN(rc);
2635 }
2636
2637 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2638                    struct ldlm_res_id *res_id, enum ldlm_type type,
2639                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2640                    __u64 *flags, struct osc_object *obj,
2641                    struct lustre_handle *lockh, int unref)
2642 {
2643         struct obd_device *obd = exp->exp_obd;
2644         __u64 lflags = *flags;
2645         enum ldlm_mode rc;
2646         ENTRY;
2647
2648         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2649                 RETURN(-EIO);
2650
2651         /* Filesystem lock extents are extended to page boundaries so that
2652          * dealing with the page cache is a little smoother */
2653         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2654         policy->l_extent.end |= ~PAGE_MASK;
2655
2656         /* Next, search for already existing extent locks that will cover us */
2657         /* If we're trying to read, we also search for an existing PW lock.  The
2658          * VFS and page cache already protect us locally, so lots of readers/
2659          * writers can share a single PW lock. */
2660         rc = mode;
2661         if (mode == LCK_PR)
2662                 rc |= LCK_PW;
2663         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2664                              res_id, type, policy, rc, lockh, unref);
2665         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2666                 RETURN(rc);
2667
2668         if (obj != NULL) {
2669                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2670
2671                 LASSERT(lock != NULL);
2672                 if (osc_set_lock_data(lock, obj)) {
2673                         lock_res_and_lock(lock);
2674                         if (!ldlm_is_lvb_cached(lock)) {
2675                                 LASSERT(lock->l_ast_data == obj);
2676                                 osc_lock_lvb_update(env, obj, lock, NULL);
2677                                 ldlm_set_lvb_cached(lock);
2678                         }
2679                         unlock_res_and_lock(lock);
2680                 } else {
2681                         ldlm_lock_decref(lockh, rc);
2682                         rc = 0;
2683                 }
2684                 LDLM_LOCK_PUT(lock);
2685         }
2686         RETURN(rc);
2687 }
2688
2689 static int osc_statfs_interpret(const struct lu_env *env,
2690                                 struct ptlrpc_request *req, void *args, int rc)
2691 {
2692         struct osc_async_args *aa = args;
2693         struct obd_statfs *msfs;
2694
2695         ENTRY;
2696         if (rc == -EBADR)
2697                 /*
2698                  * The request has in fact never been sent due to issues at
2699                  * a higher level (LOV).  Exit immediately since the caller
2700                  * is aware of the problem and takes care of the clean up.
2701                  */
2702                 RETURN(rc);
2703
2704         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2705             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2706                 GOTO(out, rc = 0);
2707
2708         if (rc != 0)
2709                 GOTO(out, rc);
2710
2711         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712         if (msfs == NULL)
2713                 GOTO(out, rc = -EPROTO);
2714
2715         *aa->aa_oi->oi_osfs = *msfs;
2716 out:
2717         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2718
2719         RETURN(rc);
2720 }
2721
2722 static int osc_statfs_async(struct obd_export *exp,
2723                             struct obd_info *oinfo, time64_t max_age,
2724                             struct ptlrpc_request_set *rqset)
2725 {
2726         struct obd_device     *obd = class_exp2obd(exp);
2727         struct ptlrpc_request *req;
2728         struct osc_async_args *aa;
2729         int rc;
2730         ENTRY;
2731
2732         if (obd->obd_osfs_age >= max_age) {
2733                 CDEBUG(D_SUPER,
2734                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2735                        obd->obd_name, &obd->obd_osfs,
2736                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2737                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2738                 spin_lock(&obd->obd_osfs_lock);
2739                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2740                 spin_unlock(&obd->obd_osfs_lock);
2741                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2742                 if (oinfo->oi_cb_up)
2743                         oinfo->oi_cb_up(oinfo, 0);
2744
2745                 RETURN(0);
2746         }
2747
2748         /* We could possibly pass max_age in the request (as an absolute
2749          * timestamp or a "seconds.usec ago") so the target can avoid doing
2750          * extra calls into the filesystem if that isn't necessary (e.g.
2751          * during mount that would help a bit).  Having relative timestamps
2752          * is not so great if request processing is slow, while absolute
2753          * timestamps are not ideal because they need time synchronization. */
2754         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2755         if (req == NULL)
2756                 RETURN(-ENOMEM);
2757
2758         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2759         if (rc) {
2760                 ptlrpc_request_free(req);
2761                 RETURN(rc);
2762         }
2763         ptlrpc_request_set_replen(req);
2764         req->rq_request_portal = OST_CREATE_PORTAL;
2765         ptlrpc_at_set_req_timeout(req);
2766
2767         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2768                 /* procfs requests not want stat in wait for avoid deadlock */
2769                 req->rq_no_resend = 1;
2770                 req->rq_no_delay = 1;
2771         }
2772
2773         req->rq_interpret_reply = osc_statfs_interpret;
2774         aa = ptlrpc_req_async_args(aa, req);
2775         aa->aa_oi = oinfo;
2776
2777         ptlrpc_set_add_req(rqset, req);
2778         RETURN(0);
2779 }
2780
2781 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2782                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2783 {
2784         struct obd_device     *obd = class_exp2obd(exp);
2785         struct obd_statfs     *msfs;
2786         struct ptlrpc_request *req;
2787         struct obd_import     *imp = NULL;
2788         int rc;
2789         ENTRY;
2790
2791
2792         /*Since the request might also come from lprocfs, so we need
2793          *sync this with client_disconnect_export Bug15684*/
2794         down_read(&obd->u.cli.cl_sem);
2795         if (obd->u.cli.cl_import)
2796                 imp = class_import_get(obd->u.cli.cl_import);
2797         up_read(&obd->u.cli.cl_sem);
2798         if (!imp)
2799                 RETURN(-ENODEV);
2800
2801         /* We could possibly pass max_age in the request (as an absolute
2802          * timestamp or a "seconds.usec ago") so the target can avoid doing
2803          * extra calls into the filesystem if that isn't necessary (e.g.
2804          * during mount that would help a bit).  Having relative timestamps
2805          * is not so great if request processing is slow, while absolute
2806          * timestamps are not ideal because they need time synchronization. */
2807         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2808
2809         class_import_put(imp);
2810
2811         if (req == NULL)
2812                 RETURN(-ENOMEM);
2813
2814         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2815         if (rc) {
2816                 ptlrpc_request_free(req);
2817                 RETURN(rc);
2818         }
2819         ptlrpc_request_set_replen(req);
2820         req->rq_request_portal = OST_CREATE_PORTAL;
2821         ptlrpc_at_set_req_timeout(req);
2822
2823         if (flags & OBD_STATFS_NODELAY) {
2824                 /* procfs requests not want stat in wait for avoid deadlock */
2825                 req->rq_no_resend = 1;
2826                 req->rq_no_delay = 1;
2827         }
2828
2829         rc = ptlrpc_queue_wait(req);
2830         if (rc)
2831                 GOTO(out, rc);
2832
2833         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2834         if (msfs == NULL)
2835                 GOTO(out, rc = -EPROTO);
2836
2837         *osfs = *msfs;
2838
2839         EXIT;
2840 out:
2841         ptlrpc_req_finished(req);
2842         return rc;
2843 }
2844
2845 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2846                          void *karg, void __user *uarg)
2847 {
2848         struct obd_device *obd = exp->exp_obd;
2849         struct obd_ioctl_data *data = karg;
2850         int rc = 0;
2851
2852         ENTRY;
2853         if (!try_module_get(THIS_MODULE)) {
2854                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2855                        module_name(THIS_MODULE));
2856                 return -EINVAL;
2857         }
2858         switch (cmd) {
2859         case OBD_IOC_CLIENT_RECOVER:
2860                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2861                                            data->ioc_inlbuf1, 0);
2862                 if (rc > 0)
2863                         rc = 0;
2864                 break;
2865         case IOC_OSC_SET_ACTIVE:
2866                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2867                                               data->ioc_offset);
2868                 break;
2869         default:
2870                 rc = -ENOTTY;
2871                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2872                        obd->obd_name, cmd, current_comm(), rc);
2873                 break;
2874         }
2875
2876         module_put(THIS_MODULE);
2877         return rc;
2878 }
2879
2880 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2881                        u32 keylen, void *key, u32 vallen, void *val,
2882                        struct ptlrpc_request_set *set)
2883 {
2884         struct ptlrpc_request *req;
2885         struct obd_device     *obd = exp->exp_obd;
2886         struct obd_import     *imp = class_exp2cliimp(exp);
2887         char                  *tmp;
2888         int                    rc;
2889         ENTRY;
2890
2891         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2892
2893         if (KEY_IS(KEY_CHECKSUM)) {
2894                 if (vallen != sizeof(int))
2895                         RETURN(-EINVAL);
2896                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2897                 RETURN(0);
2898         }
2899
2900         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2901                 sptlrpc_conf_client_adapt(obd);
2902                 RETURN(0);
2903         }
2904
2905         if (KEY_IS(KEY_FLUSH_CTX)) {
2906                 sptlrpc_import_flush_my_ctx(imp);
2907                 RETURN(0);
2908         }
2909
2910         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2911                 struct client_obd *cli = &obd->u.cli;
2912                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2913                 long target = *(long *)val;
2914
2915                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2916                 *(long *)val -= nr;
2917                 RETURN(0);
2918         }
2919
2920         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2921                 RETURN(-EINVAL);
2922
2923         /* We pass all other commands directly to OST. Since nobody calls osc
2924            methods directly and everybody is supposed to go through LOV, we
2925            assume lov checked invalid values for us.
2926            The only recognised values so far are evict_by_nid and mds_conn.
2927            Even if something bad goes through, we'd get a -EINVAL from OST
2928            anyway. */
2929
2930         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2931                                                 &RQF_OST_SET_GRANT_INFO :
2932                                                 &RQF_OBD_SET_INFO);
2933         if (req == NULL)
2934                 RETURN(-ENOMEM);
2935
2936         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2937                              RCL_CLIENT, keylen);
2938         if (!KEY_IS(KEY_GRANT_SHRINK))
2939                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2940                                      RCL_CLIENT, vallen);
2941         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2942         if (rc) {
2943                 ptlrpc_request_free(req);
2944                 RETURN(rc);
2945         }
2946
2947         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2948         memcpy(tmp, key, keylen);
2949         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2950                                                         &RMF_OST_BODY :
2951                                                         &RMF_SETINFO_VAL);
2952         memcpy(tmp, val, vallen);
2953
2954         if (KEY_IS(KEY_GRANT_SHRINK)) {
2955                 struct osc_grant_args *aa;
2956                 struct obdo *oa;
2957
2958                 aa = ptlrpc_req_async_args(aa, req);
2959                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2960                 if (!oa) {
2961                         ptlrpc_req_finished(req);
2962                         RETURN(-ENOMEM);
2963                 }
2964                 *oa = ((struct ost_body *)val)->oa;
2965                 aa->aa_oa = oa;
2966                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2967         }
2968
2969         ptlrpc_request_set_replen(req);
2970         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2971                 LASSERT(set != NULL);
2972                 ptlrpc_set_add_req(set, req);
2973                 ptlrpc_check_set(NULL, set);
2974         } else {
2975                 ptlrpcd_add_req(req);
2976         }
2977
2978         RETURN(0);
2979 }
2980 EXPORT_SYMBOL(osc_set_info_async);
2981
2982 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2983                   struct obd_device *obd, struct obd_uuid *cluuid,
2984                   struct obd_connect_data *data, void *localdata)
2985 {
2986         struct client_obd *cli = &obd->u.cli;
2987
2988         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2989                 long lost_grant;
2990                 long grant;
2991
2992                 spin_lock(&cli->cl_loi_list_lock);
2993                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2994                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2995                         /* restore ocd_grant_blkbits as client page bits */
2996                         data->ocd_grant_blkbits = PAGE_SHIFT;
2997                         grant += cli->cl_dirty_grant;
2998                 } else {
2999                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3000                 }
3001                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3002                 lost_grant = cli->cl_lost_grant;
3003                 cli->cl_lost_grant = 0;
3004                 spin_unlock(&cli->cl_loi_list_lock);
3005
3006                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3007                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3008                        data->ocd_version, data->ocd_grant, lost_grant);
3009         }
3010
3011         RETURN(0);
3012 }
3013 EXPORT_SYMBOL(osc_reconnect);
3014
3015 int osc_disconnect(struct obd_export *exp)
3016 {
3017         struct obd_device *obd = class_exp2obd(exp);
3018         int rc;
3019
3020         rc = client_disconnect_export(exp);
3021         /**
3022          * Initially we put del_shrink_grant before disconnect_export, but it
3023          * causes the following problem if setup (connect) and cleanup
3024          * (disconnect) are tangled together.
3025          *      connect p1                     disconnect p2
3026          *   ptlrpc_connect_import
3027          *     ...............               class_manual_cleanup
3028          *                                     osc_disconnect
3029          *                                     del_shrink_grant
3030          *   ptlrpc_connect_interrupt
3031          *     osc_init_grant
3032          *   add this client to shrink list
3033          *                                      cleanup_osc
3034          * Bang! grant shrink thread trigger the shrink. BUG18662
3035          */
3036         osc_del_grant_list(&obd->u.cli);
3037         return rc;
3038 }
3039 EXPORT_SYMBOL(osc_disconnect);
3040
3041 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3042                                  struct hlist_node *hnode, void *arg)
3043 {
3044         struct lu_env *env = arg;
3045         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3046         struct ldlm_lock *lock;
3047         struct osc_object *osc = NULL;
3048         ENTRY;
3049
3050         lock_res(res);
3051         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3052                 if (lock->l_ast_data != NULL && osc == NULL) {
3053                         osc = lock->l_ast_data;
3054                         cl_object_get(osc2cl(osc));
3055                 }
3056
3057                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3058                  * by the 2nd round of ldlm_namespace_clean() call in
3059                  * osc_import_event(). */
3060                 ldlm_clear_cleaned(lock);
3061         }
3062         unlock_res(res);
3063
3064         if (osc != NULL) {
3065                 osc_object_invalidate(env, osc);
3066                 cl_object_put(env, osc2cl(osc));
3067         }
3068
3069         RETURN(0);
3070 }
3071 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3072
3073 static int osc_import_event(struct obd_device *obd,
3074                             struct obd_import *imp,
3075                             enum obd_import_event event)
3076 {
3077         struct client_obd *cli;
3078         int rc = 0;
3079
3080         ENTRY;
3081         LASSERT(imp->imp_obd == obd);
3082
3083         switch (event) {
3084         case IMP_EVENT_DISCON: {
3085                 cli = &obd->u.cli;
3086                 spin_lock(&cli->cl_loi_list_lock);
3087                 cli->cl_avail_grant = 0;
3088                 cli->cl_lost_grant = 0;
3089                 spin_unlock(&cli->cl_loi_list_lock);
3090                 break;
3091         }
3092         case IMP_EVENT_INACTIVE: {
3093                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3094                 break;
3095         }
3096         case IMP_EVENT_INVALIDATE: {
3097                 struct ldlm_namespace *ns = obd->obd_namespace;
3098                 struct lu_env         *env;
3099                 __u16                  refcheck;
3100
3101                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3102
3103                 env = cl_env_get(&refcheck);
3104                 if (!IS_ERR(env)) {
3105                         osc_io_unplug(env, &obd->u.cli, NULL);
3106
3107                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3108                                                  osc_ldlm_resource_invalidate,
3109                                                  env, 0);
3110                         cl_env_put(env, &refcheck);
3111
3112                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3113                 } else
3114                         rc = PTR_ERR(env);
3115                 break;
3116         }
3117         case IMP_EVENT_ACTIVE: {
3118                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3119                 break;
3120         }
3121         case IMP_EVENT_OCD: {
3122                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3123
3124                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3125                         osc_init_grant(&obd->u.cli, ocd);
3126
3127                 /* See bug 7198 */
3128                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3129                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3130
3131                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3132                 break;
3133         }
3134         case IMP_EVENT_DEACTIVATE: {
3135                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3136                 break;
3137         }
3138         case IMP_EVENT_ACTIVATE: {
3139                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3140                 break;
3141         }
3142         default:
3143                 CERROR("Unknown import event %d\n", event);
3144                 LBUG();
3145         }
3146         RETURN(rc);
3147 }
3148
3149 /**
3150  * Determine whether the lock can be canceled before replaying the lock
3151  * during recovery, see bug16774 for detailed information.
3152  *
3153  * \retval zero the lock can't be canceled
3154  * \retval other ok to cancel
3155  */
3156 static int osc_cancel_weight(struct ldlm_lock *lock)
3157 {
3158         /*
3159          * Cancel all unused and granted extent lock.
3160          */
3161         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3162             ldlm_is_granted(lock) &&
3163             osc_ldlm_weigh_ast(lock) == 0)
3164                 RETURN(1);
3165
3166         RETURN(0);
3167 }
3168
3169 static int brw_queue_work(const struct lu_env *env, void *data)
3170 {
3171         struct client_obd *cli = data;
3172
3173         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3174
3175         osc_io_unplug(env, cli, NULL);
3176         RETURN(0);
3177 }
3178
3179 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3180 {
3181         struct client_obd *cli = &obd->u.cli;
3182         void *handler;
3183         int rc;
3184
3185         ENTRY;
3186
3187         rc = ptlrpcd_addref();
3188         if (rc)
3189                 RETURN(rc);
3190
3191         rc = client_obd_setup(obd, lcfg);
3192         if (rc)
3193                 GOTO(out_ptlrpcd, rc);
3194
3195
3196         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3197         if (IS_ERR(handler))
3198                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3199         cli->cl_writeback_work = handler;
3200
3201         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3202         if (IS_ERR(handler))
3203                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3204         cli->cl_lru_work = handler;
3205
3206         rc = osc_quota_setup(obd);
3207         if (rc)
3208                 GOTO(out_ptlrpcd_work, rc);
3209
3210         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3211         osc_update_next_shrink(cli);
3212
3213         RETURN(rc);
3214
3215 out_ptlrpcd_work:
3216         if (cli->cl_writeback_work != NULL) {
3217                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3218                 cli->cl_writeback_work = NULL;
3219         }
3220         if (cli->cl_lru_work != NULL) {
3221                 ptlrpcd_destroy_work(cli->cl_lru_work);
3222                 cli->cl_lru_work = NULL;
3223         }
3224         client_obd_cleanup(obd);
3225 out_ptlrpcd:
3226         ptlrpcd_decref();
3227         RETURN(rc);
3228 }
3229 EXPORT_SYMBOL(osc_setup_common);
3230
3231 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3232 {
3233         struct client_obd *cli = &obd->u.cli;
3234         int                adding;
3235         int                added;
3236         int                req_count;
3237         int                rc;
3238
3239         ENTRY;
3240
3241         rc = osc_setup_common(obd, lcfg);
3242         if (rc < 0)
3243                 RETURN(rc);
3244
3245         rc = osc_tunables_init(obd);
3246         if (rc)
3247                 RETURN(rc);
3248
3249         /*
3250          * We try to control the total number of requests with a upper limit
3251          * osc_reqpool_maxreqcount. There might be some race which will cause
3252          * over-limit allocation, but it is fine.
3253          */
3254         req_count = atomic_read(&osc_pool_req_count);
3255         if (req_count < osc_reqpool_maxreqcount) {
3256                 adding = cli->cl_max_rpcs_in_flight + 2;
3257                 if (req_count + adding > osc_reqpool_maxreqcount)
3258                         adding = osc_reqpool_maxreqcount - req_count;
3259
3260                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3261                 atomic_add(added, &osc_pool_req_count);
3262         }
3263
3264         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3265
3266         spin_lock(&osc_shrink_lock);
3267         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3268         spin_unlock(&osc_shrink_lock);
3269         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3270         cli->cl_import->imp_idle_debug = D_HA;
3271
3272         RETURN(0);
3273 }
3274
3275 int osc_precleanup_common(struct obd_device *obd)
3276 {
3277         struct client_obd *cli = &obd->u.cli;
3278         ENTRY;
3279
3280         /* LU-464
3281          * for echo client, export may be on zombie list, wait for
3282          * zombie thread to cull it, because cli.cl_import will be
3283          * cleared in client_disconnect_export():
3284          *   class_export_destroy() -> obd_cleanup() ->
3285          *   echo_device_free() -> echo_client_cleanup() ->
3286          *   obd_disconnect() -> osc_disconnect() ->
3287          *   client_disconnect_export()
3288          */
3289         obd_zombie_barrier();
3290         if (cli->cl_writeback_work) {
3291                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3292                 cli->cl_writeback_work = NULL;
3293         }
3294
3295         if (cli->cl_lru_work) {
3296                 ptlrpcd_destroy_work(cli->cl_lru_work);
3297                 cli->cl_lru_work = NULL;
3298         }
3299
3300         obd_cleanup_client_import(obd);
3301         RETURN(0);
3302 }
3303 EXPORT_SYMBOL(osc_precleanup_common);
3304
3305 static int osc_precleanup(struct obd_device *obd)
3306 {
3307         ENTRY;
3308
3309         osc_precleanup_common(obd);
3310
3311         ptlrpc_lprocfs_unregister_obd(obd);
3312         RETURN(0);
3313 }
3314
3315 int osc_cleanup_common(struct obd_device *obd)
3316 {
3317         struct client_obd *cli = &obd->u.cli;
3318         int rc;
3319
3320         ENTRY;
3321
3322         spin_lock(&osc_shrink_lock);
3323         list_del(&cli->cl_shrink_list);
3324         spin_unlock(&osc_shrink_lock);
3325
3326         /* lru cleanup */
3327         if (cli->cl_cache != NULL) {
3328                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3329                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3330                 list_del_init(&cli->cl_lru_osc);
3331                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3332                 cli->cl_lru_left = NULL;
3333                 cl_cache_decref(cli->cl_cache);
3334                 cli->cl_cache = NULL;
3335         }
3336
3337         /* free memory of osc quota cache */
3338         osc_quota_cleanup(obd);
3339
3340         rc = client_obd_cleanup(obd);
3341
3342         ptlrpcd_decref();
3343         RETURN(rc);
3344 }
3345 EXPORT_SYMBOL(osc_cleanup_common);
3346
3347 static const struct obd_ops osc_obd_ops = {
3348         .o_owner                = THIS_MODULE,
3349         .o_setup                = osc_setup,
3350         .o_precleanup           = osc_precleanup,
3351         .o_cleanup              = osc_cleanup_common,
3352         .o_add_conn             = client_import_add_conn,
3353         .o_del_conn             = client_import_del_conn,
3354         .o_connect              = client_connect_import,
3355         .o_reconnect            = osc_reconnect,
3356         .o_disconnect           = osc_disconnect,
3357         .o_statfs               = osc_statfs,
3358         .o_statfs_async         = osc_statfs_async,
3359         .o_create               = osc_create,
3360         .o_destroy              = osc_destroy,
3361         .o_getattr              = osc_getattr,
3362         .o_setattr              = osc_setattr,
3363         .o_iocontrol            = osc_iocontrol,
3364         .o_set_info_async       = osc_set_info_async,
3365         .o_import_event         = osc_import_event,
3366         .o_quotactl             = osc_quotactl,
3367 };
3368
3369 static struct shrinker *osc_cache_shrinker;
3370 LIST_HEAD(osc_shrink_list);
3371 DEFINE_SPINLOCK(osc_shrink_lock);
3372
3373 #ifndef HAVE_SHRINKER_COUNT
3374 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3375 {
3376         struct shrink_control scv = {
3377                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3378                 .gfp_mask   = shrink_param(sc, gfp_mask)
3379         };
3380 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3381         struct shrinker *shrinker = NULL;
3382 #endif
3383
3384         (void)osc_cache_shrink_scan(shrinker, &scv);
3385
3386         return osc_cache_shrink_count(shrinker, &scv);
3387 }
3388 #endif
3389
3390 static int __init osc_init(void)
3391 {
3392         unsigned int reqpool_size;
3393         unsigned int reqsize;
3394         int rc;
3395         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3396                          osc_cache_shrink_count, osc_cache_shrink_scan);
3397         ENTRY;
3398
3399         /* print an address of _any_ initialized kernel symbol from this
3400          * module, to allow debugging with gdb that doesn't support data
3401          * symbols from modules.*/
3402         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3403
3404         rc = lu_kmem_init(osc_caches);
3405         if (rc)
3406                 RETURN(rc);
3407
3408         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3409                                  LUSTRE_OSC_NAME, &osc_device_type);
3410         if (rc)
3411                 GOTO(out_kmem, rc);
3412
3413         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3414
3415         /* This is obviously too much memory, only prevent overflow here */
3416         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3417                 GOTO(out_type, rc = -EINVAL);
3418
3419         reqpool_size = osc_reqpool_mem_max << 20;
3420
3421         reqsize = 1;
3422         while (reqsize < OST_IO_MAXREQSIZE)
3423                 reqsize = reqsize << 1;
3424
3425         /*
3426          * We don't enlarge the request count in OSC pool according to
3427          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3428          * tried after normal allocation failed. So a small OSC pool won't
3429          * cause much performance degression in most of cases.
3430          */
3431         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3432
3433         atomic_set(&osc_pool_req_count, 0);
3434         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3435                                           ptlrpc_add_rqs_to_pool);
3436
3437         if (osc_rq_pool == NULL)
3438                 GOTO(out_type, rc = -ENOMEM);
3439
3440         rc = osc_start_grant_work();
3441         if (rc != 0)
3442                 GOTO(out_req_pool, rc);
3443
3444         RETURN(rc);
3445
3446 out_req_pool:
3447         ptlrpc_free_rq_pool(osc_rq_pool);
3448 out_type:
3449         class_unregister_type(LUSTRE_OSC_NAME);
3450 out_kmem:
3451         lu_kmem_fini(osc_caches);
3452
3453         RETURN(rc);
3454 }
3455
3456 static void __exit osc_exit(void)
3457 {
3458         osc_stop_grant_work();
3459         remove_shrinker(osc_cache_shrinker);
3460         class_unregister_type(LUSTRE_OSC_NAME);
3461         lu_kmem_fini(osc_caches);
3462         ptlrpc_free_rq_pool(osc_rq_pool);
3463 }
3464
3465 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3466 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3467 MODULE_VERSION(LUSTRE_VERSION_STRING);
3468 MODULE_LICENSE("GPL");
3469
3470 module_init(osc_init);
3471 module_exit(osc_exit);