Whamcloud - gitweb
LU-9679 modules: convert MIN/MAX to kernel style
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         LIST_HEAD(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
649                 CERROR("dirty %lu > dirty_max %lu\n",
650                        cli->cl_dirty_pages,
651                        cli->cl_dirty_max_pages);
652                 oa->o_undirty = 0;
653         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
654                             (long)(obd_max_dirty_pages + 1))) {
655                 /* The atomic_read() allowing the atomic_inc() are
656                  * not covered by a lock thus they may safely race and trip
657                  * this CERROR() unless we add in a small fudge factor (+1). */
658                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
659                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
660                        obd_max_dirty_pages);
661                 oa->o_undirty = 0;
662         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
663                             0x7fffffff)) {
664                 CERROR("dirty %lu - dirty_max %lu too big???\n",
665                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
666                 oa->o_undirty = 0;
667         } else {
668                 unsigned long nrpages;
669                 unsigned long undirty;
670
671                 nrpages = cli->cl_max_pages_per_rpc;
672                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
673                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
674                 undirty = nrpages << PAGE_SHIFT;
675                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
676                                  GRANT_PARAM)) {
677                         int nrextents;
678
679                         /* take extent tax into account when asking for more
680                          * grant space */
681                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
682                                      cli->cl_max_extent_pages;
683                         undirty += nrextents * cli->cl_grant_extent_tax;
684                 }
685                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
686                  * to add extent tax, etc.
687                  */
688                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
689                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
690         }
691         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
692         oa->o_dropped = cli->cl_lost_grant;
693         cli->cl_lost_grant = 0;
694         spin_unlock(&cli->cl_loi_list_lock);
695         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
696                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
697 }
698
699 void osc_update_next_shrink(struct client_obd *cli)
700 {
701         cli->cl_next_shrink_grant = ktime_get_seconds() +
702                                     cli->cl_grant_shrink_interval;
703
704         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
705                cli->cl_next_shrink_grant);
706 }
707
708 static void __osc_update_grant(struct client_obd *cli, u64 grant)
709 {
710         spin_lock(&cli->cl_loi_list_lock);
711         cli->cl_avail_grant += grant;
712         spin_unlock(&cli->cl_loi_list_lock);
713 }
714
715 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
716 {
717         if (body->oa.o_valid & OBD_MD_FLGRANT) {
718                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
719                 __osc_update_grant(cli, body->oa.o_grant);
720         }
721 }
722
723 /**
724  * grant thread data for shrinking space.
725  */
726 struct grant_thread_data {
727         struct list_head        gtd_clients;
728         struct mutex            gtd_mutex;
729         unsigned long           gtd_stopped:1;
730 };
731 static struct grant_thread_data client_gtd;
732
733 static int osc_shrink_grant_interpret(const struct lu_env *env,
734                                       struct ptlrpc_request *req,
735                                       void *args, int rc)
736 {
737         struct osc_grant_args *aa = args;
738         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
739         struct ost_body *body;
740
741         if (rc != 0) {
742                 __osc_update_grant(cli, aa->aa_oa->o_grant);
743                 GOTO(out, rc);
744         }
745
746         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
747         LASSERT(body);
748         osc_update_grant(cli, body);
749 out:
750         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
751         aa->aa_oa = NULL;
752
753         return rc;
754 }
755
756 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
757 {
758         spin_lock(&cli->cl_loi_list_lock);
759         oa->o_grant = cli->cl_avail_grant / 4;
760         cli->cl_avail_grant -= oa->o_grant;
761         spin_unlock(&cli->cl_loi_list_lock);
762         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
763                 oa->o_valid |= OBD_MD_FLFLAGS;
764                 oa->o_flags = 0;
765         }
766         oa->o_flags |= OBD_FL_SHRINK_GRANT;
767         osc_update_next_shrink(cli);
768 }
769
770 /* Shrink the current grant, either from some large amount to enough for a
771  * full set of in-flight RPCs, or if we have already shrunk to that limit
772  * then to enough for a single RPC.  This avoids keeping more grant than
773  * needed, and avoids shrinking the grant piecemeal. */
774 static int osc_shrink_grant(struct client_obd *cli)
775 {
776         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
777                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
778
779         spin_lock(&cli->cl_loi_list_lock);
780         if (cli->cl_avail_grant <= target_bytes)
781                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
782         spin_unlock(&cli->cl_loi_list_lock);
783
784         return osc_shrink_grant_to_target(cli, target_bytes);
785 }
786
787 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
788 {
789         int                     rc = 0;
790         struct ost_body        *body;
791         ENTRY;
792
793         spin_lock(&cli->cl_loi_list_lock);
794         /* Don't shrink if we are already above or below the desired limit
795          * We don't want to shrink below a single RPC, as that will negatively
796          * impact block allocation and long-term performance. */
797         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799
800         if (target_bytes >= cli->cl_avail_grant) {
801                 spin_unlock(&cli->cl_loi_list_lock);
802                 RETURN(0);
803         }
804         spin_unlock(&cli->cl_loi_list_lock);
805
806         OBD_ALLOC_PTR(body);
807         if (!body)
808                 RETURN(-ENOMEM);
809
810         osc_announce_cached(cli, &body->oa, 0);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (target_bytes >= cli->cl_avail_grant) {
814                 /* available grant has changed since target calculation */
815                 spin_unlock(&cli->cl_loi_list_lock);
816                 GOTO(out_free, rc = 0);
817         }
818         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
819         cli->cl_avail_grant = target_bytes;
820         spin_unlock(&cli->cl_loi_list_lock);
821         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
822                 body->oa.o_valid |= OBD_MD_FLFLAGS;
823                 body->oa.o_flags = 0;
824         }
825         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
826         osc_update_next_shrink(cli);
827
828         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
829                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
830                                 sizeof(*body), body, NULL);
831         if (rc != 0)
832                 __osc_update_grant(cli, body->oa.o_grant);
833 out_free:
834         OBD_FREE_PTR(body);
835         RETURN(rc);
836 }
837
838 static int osc_should_shrink_grant(struct client_obd *client)
839 {
840         time64_t next_shrink = client->cl_next_shrink_grant;
841
842         if (client->cl_import == NULL)
843                 return 0;
844
845         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
846             client->cl_import->imp_grant_shrink_disabled) {
847                 osc_update_next_shrink(client);
848                 return 0;
849         }
850
851         if (ktime_get_seconds() >= next_shrink - 5) {
852                 /* Get the current RPC size directly, instead of going via:
853                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
854                  * Keep comment here so that it can be found by searching. */
855                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
856
857                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
858                     client->cl_avail_grant > brw_size)
859                         return 1;
860                 else
861                         osc_update_next_shrink(client);
862         }
863         return 0;
864 }
865
866 #define GRANT_SHRINK_RPC_BATCH  100
867
868 static struct delayed_work work;
869
870 static void osc_grant_work_handler(struct work_struct *data)
871 {
872         struct client_obd *cli;
873         int rpc_sent;
874         bool init_next_shrink = true;
875         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
876
877         rpc_sent = 0;
878         mutex_lock(&client_gtd.gtd_mutex);
879         list_for_each_entry(cli, &client_gtd.gtd_clients,
880                             cl_grant_chain) {
881                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
882                     osc_should_shrink_grant(cli)) {
883                         osc_shrink_grant(cli);
884                         rpc_sent++;
885                 }
886
887                 if (!init_next_shrink) {
888                         if (cli->cl_next_shrink_grant < next_shrink &&
889                             cli->cl_next_shrink_grant > ktime_get_seconds())
890                                 next_shrink = cli->cl_next_shrink_grant;
891                 } else {
892                         init_next_shrink = false;
893                         next_shrink = cli->cl_next_shrink_grant;
894                 }
895         }
896         mutex_unlock(&client_gtd.gtd_mutex);
897
898         if (client_gtd.gtd_stopped == 1)
899                 return;
900
901         if (next_shrink > ktime_get_seconds()) {
902                 time64_t delay = next_shrink - ktime_get_seconds();
903
904                 schedule_delayed_work(&work, cfs_time_seconds(delay));
905         } else {
906                 schedule_work(&work.work);
907         }
908 }
909
910 void osc_schedule_grant_work(void)
911 {
912         cancel_delayed_work_sync(&work);
913         schedule_work(&work.work);
914 }
915
916 /**
917  * Start grant thread for returing grant to server for idle clients.
918  */
919 static int osc_start_grant_work(void)
920 {
921         client_gtd.gtd_stopped = 0;
922         mutex_init(&client_gtd.gtd_mutex);
923         INIT_LIST_HEAD(&client_gtd.gtd_clients);
924
925         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
926         schedule_work(&work.work);
927
928         return 0;
929 }
930
931 static void osc_stop_grant_work(void)
932 {
933         client_gtd.gtd_stopped = 1;
934         cancel_delayed_work_sync(&work);
935 }
936
937 static void osc_add_grant_list(struct client_obd *client)
938 {
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 static void osc_del_grant_list(struct client_obd *client)
945 {
946         if (list_empty(&client->cl_grant_chain))
947                 return;
948
949         mutex_lock(&client_gtd.gtd_mutex);
950         list_del_init(&client->cl_grant_chain);
951         mutex_unlock(&client_gtd.gtd_mutex);
952 }
953
954 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
955 {
956         /*
957          * ocd_grant is the total grant amount we're expect to hold: if we've
958          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
959          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
960          * dirty.
961          *
962          * race is tolerable here: if we're evicted, but imp_state already
963          * left EVICTED state, then cl_dirty_pages must be 0 already.
964          */
965         spin_lock(&cli->cl_loi_list_lock);
966         cli->cl_avail_grant = ocd->ocd_grant;
967         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
968                 cli->cl_avail_grant -= cli->cl_reserved_grant;
969                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
970                         cli->cl_avail_grant -= cli->cl_dirty_grant;
971                 else
972                         cli->cl_avail_grant -=
973                                         cli->cl_dirty_pages << PAGE_SHIFT;
974         }
975
976         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
977                 u64 size;
978                 int chunk_mask;
979
980                 /* overhead for each extent insertion */
981                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
982                 /* determine the appropriate chunk size used by osc_extent. */
983                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
984                                           ocd->ocd_grant_blkbits);
985                 /* max_pages_per_rpc must be chunk aligned */
986                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
987                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
988                                              ~chunk_mask) & chunk_mask;
989                 /* determine maximum extent size, in #pages */
990                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
991                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
992                 if (cli->cl_max_extent_pages == 0)
993                         cli->cl_max_extent_pages = 1;
994         } else {
995                 cli->cl_grant_extent_tax = 0;
996                 cli->cl_chunkbits = PAGE_SHIFT;
997                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
998         }
999         spin_unlock(&cli->cl_loi_list_lock);
1000
1001         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1002                 "chunk bits: %d cl_max_extent_pages: %d\n",
1003                 cli_name(cli),
1004                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1005                 cli->cl_max_extent_pages);
1006
1007         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1008                 osc_add_grant_list(cli);
1009 }
1010 EXPORT_SYMBOL(osc_init_grant);
1011
1012 /* We assume that the reason this OSC got a short read is because it read
1013  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1014  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1015  * this stripe never got written at or beyond this stripe offset yet. */
1016 static void handle_short_read(int nob_read, size_t page_count,
1017                               struct brw_page **pga)
1018 {
1019         char *ptr;
1020         int i = 0;
1021
1022         /* skip bytes read OK */
1023         while (nob_read > 0) {
1024                 LASSERT (page_count > 0);
1025
1026                 if (pga[i]->count > nob_read) {
1027                         /* EOF inside this page */
1028                         ptr = kmap(pga[i]->pg) +
1029                                 (pga[i]->off & ~PAGE_MASK);
1030                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1031                         kunmap(pga[i]->pg);
1032                         page_count--;
1033                         i++;
1034                         break;
1035                 }
1036
1037                 nob_read -= pga[i]->count;
1038                 page_count--;
1039                 i++;
1040         }
1041
1042         /* zero remaining pages */
1043         while (page_count-- > 0) {
1044                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1045                 memset(ptr, 0, pga[i]->count);
1046                 kunmap(pga[i]->pg);
1047                 i++;
1048         }
1049 }
1050
1051 static int check_write_rcs(struct ptlrpc_request *req,
1052                            int requested_nob, int niocount,
1053                            size_t page_count, struct brw_page **pga)
1054 {
1055         int     i;
1056         __u32   *remote_rcs;
1057
1058         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1059                                                   sizeof(*remote_rcs) *
1060                                                   niocount);
1061         if (remote_rcs == NULL) {
1062                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1063                 return(-EPROTO);
1064         }
1065
1066         /* return error if any niobuf was in error */
1067         for (i = 0; i < niocount; i++) {
1068                 if ((int)remote_rcs[i] < 0) {
1069                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1070                                i, remote_rcs[i], req);
1071                         return remote_rcs[i];
1072                 }
1073
1074                 if (remote_rcs[i] != 0) {
1075                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1076                                 i, remote_rcs[i], req);
1077                         return(-EPROTO);
1078                 }
1079         }
1080         if (req->rq_bulk != NULL &&
1081             req->rq_bulk->bd_nob_transferred != requested_nob) {
1082                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1083                        req->rq_bulk->bd_nob_transferred, requested_nob);
1084                 return(-EPROTO);
1085         }
1086
1087         return (0);
1088 }
1089
1090 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1091 {
1092         if (p1->flag != p2->flag) {
1093                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1094                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1095                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1096
1097                 /* warn if we try to combine flags that we don't know to be
1098                  * safe to combine */
1099                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1100                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1101                               "report this at https://jira.whamcloud.com/\n",
1102                               p1->flag, p2->flag);
1103                 }
1104                 return 0;
1105         }
1106
1107         return (p1->off + p1->count == p2->off);
1108 }
1109
1110 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1111 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1112                                    size_t pg_count, struct brw_page **pga,
1113                                    int opc, obd_dif_csum_fn *fn,
1114                                    int sector_size,
1115                                    u32 *check_sum)
1116 {
1117         struct ahash_request *req;
1118         /* Used Adler as the default checksum type on top of DIF tags */
1119         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1120         struct page *__page;
1121         unsigned char *buffer;
1122         __u16 *guard_start;
1123         unsigned int bufsize;
1124         int guard_number;
1125         int used_number = 0;
1126         int used;
1127         u32 cksum;
1128         int rc = 0;
1129         int i = 0;
1130
1131         LASSERT(pg_count > 0);
1132
1133         __page = alloc_page(GFP_KERNEL);
1134         if (__page == NULL)
1135                 return -ENOMEM;
1136
1137         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1138         if (IS_ERR(req)) {
1139                 rc = PTR_ERR(req);
1140                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1141                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1142                 GOTO(out, rc);
1143         }
1144
1145         buffer = kmap(__page);
1146         guard_start = (__u16 *)buffer;
1147         guard_number = PAGE_SIZE / sizeof(*guard_start);
1148         while (nob > 0 && pg_count > 0) {
1149                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1150
1151                 /* corrupt the data before we compute the checksum, to
1152                  * simulate an OST->client data error */
1153                 if (unlikely(i == 0 && opc == OST_READ &&
1154                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1155                         unsigned char *ptr = kmap(pga[i]->pg);
1156                         int off = pga[i]->off & ~PAGE_MASK;
1157
1158                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1159                         kunmap(pga[i]->pg);
1160                 }
1161
1162                 /*
1163                  * The left guard number should be able to hold checksums of a
1164                  * whole page
1165                  */
1166                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1167                                                   pga[i]->off & ~PAGE_MASK,
1168                                                   count,
1169                                                   guard_start + used_number,
1170                                                   guard_number - used_number,
1171                                                   &used, sector_size,
1172                                                   fn);
1173                 if (rc)
1174                         break;
1175
1176                 used_number += used;
1177                 if (used_number == guard_number) {
1178                         cfs_crypto_hash_update_page(req, __page, 0,
1179                                 used_number * sizeof(*guard_start));
1180                         used_number = 0;
1181                 }
1182
1183                 nob -= pga[i]->count;
1184                 pg_count--;
1185                 i++;
1186         }
1187         kunmap(__page);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         if (used_number != 0)
1192                 cfs_crypto_hash_update_page(req, __page, 0,
1193                         used_number * sizeof(*guard_start));
1194
1195         bufsize = sizeof(cksum);
1196         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1197
1198         /* For sending we only compute the wrong checksum instead
1199          * of corrupting the data so it is still correct on a redo */
1200         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1201                 cksum++;
1202
1203         *check_sum = cksum;
1204 out:
1205         __free_page(__page);
1206         return rc;
1207 }
1208 #else /* !CONFIG_CRC_T10DIF */
1209 #define obd_dif_ip_fn NULL
1210 #define obd_dif_crc_fn NULL
1211 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1212         -EOPNOTSUPP
1213 #endif /* CONFIG_CRC_T10DIF */
1214
1215 static int osc_checksum_bulk(int nob, size_t pg_count,
1216                              struct brw_page **pga, int opc,
1217                              enum cksum_types cksum_type,
1218                              u32 *cksum)
1219 {
1220         int                             i = 0;
1221         struct ahash_request           *req;
1222         unsigned int                    bufsize;
1223         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1224
1225         LASSERT(pg_count > 0);
1226
1227         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1228         if (IS_ERR(req)) {
1229                 CERROR("Unable to initialize checksum hash %s\n",
1230                        cfs_crypto_hash_name(cfs_alg));
1231                 return PTR_ERR(req);
1232         }
1233
1234         while (nob > 0 && pg_count > 0) {
1235                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1236
1237                 /* corrupt the data before we compute the checksum, to
1238                  * simulate an OST->client data error */
1239                 if (i == 0 && opc == OST_READ &&
1240                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1241                         unsigned char *ptr = kmap(pga[i]->pg);
1242                         int off = pga[i]->off & ~PAGE_MASK;
1243
1244                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1245                         kunmap(pga[i]->pg);
1246                 }
1247                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1248                                             pga[i]->off & ~PAGE_MASK,
1249                                             count);
1250                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1251                                (int)(pga[i]->off & ~PAGE_MASK));
1252
1253                 nob -= pga[i]->count;
1254                 pg_count--;
1255                 i++;
1256         }
1257
1258         bufsize = sizeof(*cksum);
1259         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1260
1261         /* For sending we only compute the wrong checksum instead
1262          * of corrupting the data so it is still correct on a redo */
1263         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264                 (*cksum)++;
1265
1266         return 0;
1267 }
1268
1269 static int osc_checksum_bulk_rw(const char *obd_name,
1270                                 enum cksum_types cksum_type,
1271                                 int nob, size_t pg_count,
1272                                 struct brw_page **pga, int opc,
1273                                 u32 *check_sum)
1274 {
1275         obd_dif_csum_fn *fn = NULL;
1276         int sector_size = 0;
1277         int rc;
1278
1279         ENTRY;
1280         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1281
1282         if (fn)
1283                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1284                                              opc, fn, sector_size, check_sum);
1285         else
1286                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1287                                        check_sum);
1288
1289         RETURN(rc);
1290 }
1291
1292 static int
1293 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1294                      u32 page_count, struct brw_page **pga,
1295                      struct ptlrpc_request **reqp, int resend)
1296 {
1297         struct ptlrpc_request   *req;
1298         struct ptlrpc_bulk_desc *desc;
1299         struct ost_body         *body;
1300         struct obd_ioobj        *ioobj;
1301         struct niobuf_remote    *niobuf;
1302         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1303         struct osc_brw_async_args *aa;
1304         struct req_capsule      *pill;
1305         struct brw_page *pg_prev;
1306         void *short_io_buf;
1307         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1308
1309         ENTRY;
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1311                 RETURN(-ENOMEM); /* Recoverable */
1312         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1313                 RETURN(-EINVAL); /* Fatal */
1314
1315         if ((cmd & OBD_BRW_WRITE) != 0) {
1316                 opc = OST_WRITE;
1317                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1318                                                 osc_rq_pool,
1319                                                 &RQF_OST_BRW_WRITE);
1320         } else {
1321                 opc = OST_READ;
1322                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1323         }
1324         if (req == NULL)
1325                 RETURN(-ENOMEM);
1326
1327         for (niocount = i = 1; i < page_count; i++) {
1328                 if (!can_merge_pages(pga[i - 1], pga[i]))
1329                         niocount++;
1330         }
1331
1332         pill = &req->rq_pill;
1333         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1334                              sizeof(*ioobj));
1335         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1336                              niocount * sizeof(*niobuf));
1337
1338         for (i = 0; i < page_count; i++)
1339                 short_io_size += pga[i]->count;
1340
1341         /* Check if read/write is small enough to be a short io. */
1342         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1343             !imp_connect_shortio(cli->cl_import))
1344                 short_io_size = 0;
1345
1346         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1347                              opc == OST_READ ? 0 : short_io_size);
1348         if (opc == OST_READ)
1349                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1350                                      short_io_size);
1351
1352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1353         if (rc) {
1354                 ptlrpc_request_free(req);
1355                 RETURN(rc);
1356         }
1357         osc_set_io_portal(req);
1358
1359         ptlrpc_at_set_req_timeout(req);
1360         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1361          * retry logic */
1362         req->rq_no_retry_einprogress = 1;
1363
1364         if (short_io_size != 0) {
1365                 desc = NULL;
1366                 short_io_buf = NULL;
1367                 goto no_bulk;
1368         }
1369
1370         desc = ptlrpc_prep_bulk_imp(req, page_count,
1371                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1372                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1373                         PTLRPC_BULK_PUT_SINK) |
1374                         PTLRPC_BULK_BUF_KIOV,
1375                 OST_BULK_PORTAL,
1376                 &ptlrpc_bulk_kiov_pin_ops);
1377
1378         if (desc == NULL)
1379                 GOTO(out, rc = -ENOMEM);
1380         /* NB request now owns desc and will free it when it gets freed */
1381 no_bulk:
1382         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1383         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1384         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1385         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1386
1387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1388
1389         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1390          * and from_kgid(), because they are asynchronous. Fortunately, variable
1391          * oa contains valid o_uid and o_gid in these two operations.
1392          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1393          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1394          * other process logic */
1395         body->oa.o_uid = oa->o_uid;
1396         body->oa.o_gid = oa->o_gid;
1397
1398         obdo_to_ioobj(oa, ioobj);
1399         ioobj->ioo_bufcnt = niocount;
1400         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1401          * that might be send for this request.  The actual number is decided
1402          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1403          * "max - 1" for old client compatibility sending "0", and also so the
1404          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1405         if (desc != NULL)
1406                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1407         else /* short io */
1408                 ioobj_max_brw_set(ioobj, 0);
1409
1410         if (short_io_size != 0) {
1411                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1412                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1413                         body->oa.o_flags = 0;
1414                 }
1415                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1416                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1417                        short_io_size);
1418                 if (opc == OST_WRITE) {
1419                         short_io_buf = req_capsule_client_get(pill,
1420                                                               &RMF_SHORT_IO);
1421                         LASSERT(short_io_buf != NULL);
1422                 }
1423         }
1424
1425         LASSERT(page_count > 0);
1426         pg_prev = pga[0];
1427         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1428                 struct brw_page *pg = pga[i];
1429                 int poff = pg->off & ~PAGE_MASK;
1430
1431                 LASSERT(pg->count > 0);
1432                 /* make sure there is no gap in the middle of page array */
1433                 LASSERTF(page_count == 1 ||
1434                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1435                           ergo(i > 0 && i < page_count - 1,
1436                                poff == 0 && pg->count == PAGE_SIZE)   &&
1437                           ergo(i == page_count - 1, poff == 0)),
1438                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1439                          i, page_count, pg, pg->off, pg->count);
1440                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1441                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1442                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1443                          i, page_count,
1444                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1445                          pg_prev->pg, page_private(pg_prev->pg),
1446                          pg_prev->pg->index, pg_prev->off);
1447                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1448                         (pg->flag & OBD_BRW_SRVLOCK));
1449                 if (short_io_size != 0 && opc == OST_WRITE) {
1450                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1451
1452                         LASSERT(short_io_size >= requested_nob + pg->count);
1453                         memcpy(short_io_buf + requested_nob,
1454                                ptr + poff,
1455                                pg->count);
1456                         ll_kunmap_atomic(ptr, KM_USER0);
1457                 } else if (short_io_size == 0) {
1458                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1459                                                          pg->count);
1460                 }
1461                 requested_nob += pg->count;
1462
1463                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1464                         niobuf--;
1465                         niobuf->rnb_len += pg->count;
1466                 } else {
1467                         niobuf->rnb_offset = pg->off;
1468                         niobuf->rnb_len    = pg->count;
1469                         niobuf->rnb_flags  = pg->flag;
1470                 }
1471                 pg_prev = pg;
1472         }
1473
1474         LASSERTF((void *)(niobuf - niocount) ==
1475                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1476                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1477                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1478
1479         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1480         if (resend) {
1481                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1482                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1483                         body->oa.o_flags = 0;
1484                 }
1485                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1486         }
1487
1488         if (osc_should_shrink_grant(cli))
1489                 osc_shrink_grant_local(cli, &body->oa);
1490
1491         /* size[REQ_REC_OFF] still sizeof (*body) */
1492         if (opc == OST_WRITE) {
1493                 if (cli->cl_checksum &&
1494                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1495                         /* store cl_cksum_type in a local variable since
1496                          * it can be changed via lprocfs */
1497                         enum cksum_types cksum_type = cli->cl_cksum_type;
1498
1499                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1500                                 body->oa.o_flags = 0;
1501
1502                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1503                                                                 cksum_type);
1504                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1505
1506                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1507                                                   requested_nob, page_count,
1508                                                   pga, OST_WRITE,
1509                                                   &body->oa.o_cksum);
1510                         if (rc < 0) {
1511                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1512                                        rc);
1513                                 GOTO(out, rc);
1514                         }
1515                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1516                                body->oa.o_cksum);
1517
1518                         /* save this in 'oa', too, for later checking */
1519                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1521                                                            cksum_type);
1522                 } else {
1523                         /* clear out the checksum flag, in case this is a
1524                          * resend but cl_checksum is no longer set. b=11238 */
1525                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1526                 }
1527                 oa->o_cksum = body->oa.o_cksum;
1528                 /* 1 RC per niobuf */
1529                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1530                                      sizeof(__u32) * niocount);
1531         } else {
1532                 if (cli->cl_checksum &&
1533                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1534                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1535                                 body->oa.o_flags = 0;
1536                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1537                                 cli->cl_cksum_type);
1538                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1539                 }
1540
1541                 /* Client cksum has been already copied to wire obdo in previous
1542                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1543                  * resent due to cksum error, this will allow Server to
1544                  * check+dump pages on its side */
1545         }
1546         ptlrpc_request_set_replen(req);
1547
1548         aa = ptlrpc_req_async_args(aa, req);
1549         aa->aa_oa = oa;
1550         aa->aa_requested_nob = requested_nob;
1551         aa->aa_nio_count = niocount;
1552         aa->aa_page_count = page_count;
1553         aa->aa_resends = 0;
1554         aa->aa_ppga = pga;
1555         aa->aa_cli = cli;
1556         INIT_LIST_HEAD(&aa->aa_oaps);
1557
1558         *reqp = req;
1559         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1560         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1561                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1562                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1563         RETURN(0);
1564
1565  out:
1566         ptlrpc_req_finished(req);
1567         RETURN(rc);
1568 }
1569
1570 char dbgcksum_file_name[PATH_MAX];
1571
1572 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1573                                 struct brw_page **pga, __u32 server_cksum,
1574                                 __u32 client_cksum)
1575 {
1576         struct file *filp;
1577         int rc, i;
1578         unsigned int len;
1579         char *buf;
1580
1581         /* will only keep dump of pages on first error for the same range in
1582          * file/fid, not during the resends/retries. */
1583         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1584                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1585                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1586                   libcfs_debug_file_path_arr :
1587                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1591                  pga[0]->off,
1592                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1593                  client_cksum, server_cksum);
1594         filp = filp_open(dbgcksum_file_name,
1595                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1596         if (IS_ERR(filp)) {
1597                 rc = PTR_ERR(filp);
1598                 if (rc == -EEXIST)
1599                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1600                                "checksum error: rc = %d\n", dbgcksum_file_name,
1601                                rc);
1602                 else
1603                         CERROR("%s: can't open to dump pages with checksum "
1604                                "error: rc = %d\n", dbgcksum_file_name, rc);
1605                 return;
1606         }
1607
1608         for (i = 0; i < page_count; i++) {
1609                 len = pga[i]->count;
1610                 buf = kmap(pga[i]->pg);
1611                 while (len != 0) {
1612                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1613                         if (rc < 0) {
1614                                 CERROR("%s: wanted to write %u but got %d "
1615                                        "error\n", dbgcksum_file_name, len, rc);
1616                                 break;
1617                         }
1618                         len -= rc;
1619                         buf += rc;
1620                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1621                                dbgcksum_file_name, rc);
1622                 }
1623                 kunmap(pga[i]->pg);
1624         }
1625
1626         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1627         if (rc)
1628                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1629         filp_close(filp, NULL);
1630 }
1631
1632 static int
1633 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1634                      __u32 client_cksum, __u32 server_cksum,
1635                      struct osc_brw_async_args *aa)
1636 {
1637         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1638         enum cksum_types cksum_type;
1639         obd_dif_csum_fn *fn = NULL;
1640         int sector_size = 0;
1641         __u32 new_cksum;
1642         char *msg;
1643         int rc;
1644
1645         if (server_cksum == client_cksum) {
1646                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1647                 return 0;
1648         }
1649
1650         if (aa->aa_cli->cl_checksum_dump)
1651                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1652                                     server_cksum, client_cksum);
1653
1654         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1655                                            oa->o_flags : 0);
1656
1657         switch (cksum_type) {
1658         case OBD_CKSUM_T10IP512:
1659                 fn = obd_dif_ip_fn;
1660                 sector_size = 512;
1661                 break;
1662         case OBD_CKSUM_T10IP4K:
1663                 fn = obd_dif_ip_fn;
1664                 sector_size = 4096;
1665                 break;
1666         case OBD_CKSUM_T10CRC512:
1667                 fn = obd_dif_crc_fn;
1668                 sector_size = 512;
1669                 break;
1670         case OBD_CKSUM_T10CRC4K:
1671                 fn = obd_dif_crc_fn;
1672                 sector_size = 4096;
1673                 break;
1674         default:
1675                 break;
1676         }
1677
1678         if (fn)
1679                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1680                                              aa->aa_page_count, aa->aa_ppga,
1681                                              OST_WRITE, fn, sector_size,
1682                                              &new_cksum);
1683         else
1684                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1685                                        aa->aa_ppga, OST_WRITE, cksum_type,
1686                                        &new_cksum);
1687
1688         if (rc < 0)
1689                 msg = "failed to calculate the client write checksum";
1690         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1691                 msg = "the server did not use the checksum type specified in "
1692                       "the original request - likely a protocol problem";
1693         else if (new_cksum == server_cksum)
1694                 msg = "changed on the client after we checksummed it - "
1695                       "likely false positive due to mmap IO (bug 11742)";
1696         else if (new_cksum == client_cksum)
1697                 msg = "changed in transit before arrival at OST";
1698         else
1699                 msg = "changed in transit AND doesn't match the original - "
1700                       "likely false positive due to mmap IO (bug 11742)";
1701
1702         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1703                            DFID " object "DOSTID" extent [%llu-%llu], original "
1704                            "client csum %x (type %x), server csum %x (type %x),"
1705                            " client csum now %x\n",
1706                            obd_name, msg, libcfs_nid2str(peer->nid),
1707                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1710                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1711                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1712                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1713                            client_cksum,
1714                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1715                            server_cksum, cksum_type, new_cksum);
1716         return 1;
1717 }
1718
1719 /* Note rc enters this function as number of bytes transferred */
1720 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1721 {
1722         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1723         struct client_obd *cli = aa->aa_cli;
1724         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1725         const struct lnet_process_id *peer =
1726                 &req->rq_import->imp_connection->c_peer;
1727         struct ost_body *body;
1728         u32 client_cksum = 0;
1729
1730         ENTRY;
1731
1732         if (rc < 0 && rc != -EDQUOT) {
1733                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1734                 RETURN(rc);
1735         }
1736
1737         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1738         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1739         if (body == NULL) {
1740                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1741                 RETURN(-EPROTO);
1742         }
1743
1744         /* set/clear over quota flag for a uid/gid/projid */
1745         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1746             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1747                 unsigned qid[LL_MAXQUOTAS] = {
1748                                          body->oa.o_uid, body->oa.o_gid,
1749                                          body->oa.o_projid };
1750                 CDEBUG(D_QUOTA,
1751                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1752                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1753                        body->oa.o_valid, body->oa.o_flags);
1754                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1755                                        body->oa.o_flags);
1756         }
1757
1758         osc_update_grant(cli, body);
1759
1760         if (rc < 0)
1761                 RETURN(rc);
1762
1763         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1764                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1765
1766         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1767                 if (rc > 0) {
1768                         CERROR("%s: unexpected positive size %d\n",
1769                                obd_name, rc);
1770                         RETURN(-EPROTO);
1771                 }
1772
1773                 if (req->rq_bulk != NULL &&
1774                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1775                         RETURN(-EAGAIN);
1776
1777                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1778                     check_write_checksum(&body->oa, peer, client_cksum,
1779                                          body->oa.o_cksum, aa))
1780                         RETURN(-EAGAIN);
1781
1782                 rc = check_write_rcs(req, aa->aa_requested_nob,
1783                                      aa->aa_nio_count, aa->aa_page_count,
1784                                      aa->aa_ppga);
1785                 GOTO(out, rc);
1786         }
1787
1788         /* The rest of this function executes only for OST_READs */
1789
1790         if (req->rq_bulk == NULL) {
1791                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1792                                           RCL_SERVER);
1793                 LASSERT(rc == req->rq_status);
1794         } else {
1795                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1796                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1797         }
1798         if (rc < 0)
1799                 GOTO(out, rc = -EAGAIN);
1800
1801         if (rc > aa->aa_requested_nob) {
1802                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1803                        rc, aa->aa_requested_nob);
1804                 RETURN(-EPROTO);
1805         }
1806
1807         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1808                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1809                        rc, req->rq_bulk->bd_nob_transferred);
1810                 RETURN(-EPROTO);
1811         }
1812
1813         if (req->rq_bulk == NULL) {
1814                 /* short io */
1815                 int nob, pg_count, i = 0;
1816                 unsigned char *buf;
1817
1818                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1819                 pg_count = aa->aa_page_count;
1820                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1821                                                    rc);
1822                 nob = rc;
1823                 while (nob > 0 && pg_count > 0) {
1824                         unsigned char *ptr;
1825                         int count = aa->aa_ppga[i]->count > nob ?
1826                                     nob : aa->aa_ppga[i]->count;
1827
1828                         CDEBUG(D_CACHE, "page %p count %d\n",
1829                                aa->aa_ppga[i]->pg, count);
1830                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1831                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1832                                count);
1833                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1834
1835                         buf += count;
1836                         nob -= count;
1837                         i++;
1838                         pg_count--;
1839                 }
1840         }
1841
1842         if (rc < aa->aa_requested_nob)
1843                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1844
1845         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1846                 static int cksum_counter;
1847                 u32        server_cksum = body->oa.o_cksum;
1848                 char      *via = "";
1849                 char      *router = "";
1850                 enum cksum_types cksum_type;
1851                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1852                         body->oa.o_flags : 0;
1853
1854                 cksum_type = obd_cksum_type_unpack(o_flags);
1855                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1856                                           aa->aa_page_count, aa->aa_ppga,
1857                                           OST_READ, &client_cksum);
1858                 if (rc < 0)
1859                         GOTO(out, rc);
1860
1861                 if (req->rq_bulk != NULL &&
1862                     peer->nid != req->rq_bulk->bd_sender) {
1863                         via = " via ";
1864                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1865                 }
1866
1867                 if (server_cksum != client_cksum) {
1868                         struct ost_body *clbody;
1869                         u32 page_count = aa->aa_page_count;
1870
1871                         clbody = req_capsule_client_get(&req->rq_pill,
1872                                                         &RMF_OST_BODY);
1873                         if (cli->cl_checksum_dump)
1874                                 dump_all_bulk_pages(&clbody->oa, page_count,
1875                                                     aa->aa_ppga, server_cksum,
1876                                                     client_cksum);
1877
1878                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1879                                            "%s%s%s inode "DFID" object "DOSTID
1880                                            " extent [%llu-%llu], client %x, "
1881                                            "server %x, cksum_type %x\n",
1882                                            obd_name,
1883                                            libcfs_nid2str(peer->nid),
1884                                            via, router,
1885                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1886                                                 clbody->oa.o_parent_seq : 0ULL,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_oid : 0,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_ver : 0,
1891                                            POSTID(&body->oa.o_oi),
1892                                            aa->aa_ppga[0]->off,
1893                                            aa->aa_ppga[page_count-1]->off +
1894                                            aa->aa_ppga[page_count-1]->count - 1,
1895                                            client_cksum, server_cksum,
1896                                            cksum_type);
1897                         cksum_counter = 0;
1898                         aa->aa_oa->o_cksum = client_cksum;
1899                         rc = -EAGAIN;
1900                 } else {
1901                         cksum_counter++;
1902                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1903                         rc = 0;
1904                 }
1905         } else if (unlikely(client_cksum)) {
1906                 static int cksum_missed;
1907
1908                 cksum_missed++;
1909                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1910                         CERROR("%s: checksum %u requested from %s but not sent\n",
1911                                obd_name, cksum_missed,
1912                                libcfs_nid2str(peer->nid));
1913         } else {
1914                 rc = 0;
1915         }
1916 out:
1917         if (rc >= 0)
1918                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1919                                      aa->aa_oa, &body->oa);
1920
1921         RETURN(rc);
1922 }
1923
1924 static int osc_brw_redo_request(struct ptlrpc_request *request,
1925                                 struct osc_brw_async_args *aa, int rc)
1926 {
1927         struct ptlrpc_request *new_req;
1928         struct osc_brw_async_args *new_aa;
1929         struct osc_async_page *oap;
1930         ENTRY;
1931
1932         /* The below message is checked in replay-ost-single.sh test_8ae*/
1933         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1934                   "redo for recoverable error %d", rc);
1935
1936         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1937                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1938                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1939                                   aa->aa_ppga, &new_req, 1);
1940         if (rc)
1941                 RETURN(rc);
1942
1943         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1944                 if (oap->oap_request != NULL) {
1945                         LASSERTF(request == oap->oap_request,
1946                                  "request %p != oap_request %p\n",
1947                                  request, oap->oap_request);
1948                         if (oap->oap_interrupted) {
1949                                 ptlrpc_req_finished(new_req);
1950                                 RETURN(-EINTR);
1951                         }
1952                 }
1953         }
1954         /*
1955          * New request takes over pga and oaps from old request.
1956          * Note that copying a list_head doesn't work, need to move it...
1957          */
1958         aa->aa_resends++;
1959         new_req->rq_interpret_reply = request->rq_interpret_reply;
1960         new_req->rq_async_args = request->rq_async_args;
1961         new_req->rq_commit_cb = request->rq_commit_cb;
1962         /* cap resend delay to the current request timeout, this is similar to
1963          * what ptlrpc does (see after_reply()) */
1964         if (aa->aa_resends > new_req->rq_timeout)
1965                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1966         else
1967                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1968         new_req->rq_generation_set = 1;
1969         new_req->rq_import_generation = request->rq_import_generation;
1970
1971         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1972
1973         INIT_LIST_HEAD(&new_aa->aa_oaps);
1974         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1975         INIT_LIST_HEAD(&new_aa->aa_exts);
1976         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1977         new_aa->aa_resends = aa->aa_resends;
1978
1979         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1980                 if (oap->oap_request) {
1981                         ptlrpc_req_finished(oap->oap_request);
1982                         oap->oap_request = ptlrpc_request_addref(new_req);
1983                 }
1984         }
1985
1986         /* XXX: This code will run into problem if we're going to support
1987          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1988          * and wait for all of them to be finished. We should inherit request
1989          * set from old request. */
1990         ptlrpcd_add_req(new_req);
1991
1992         DEBUG_REQ(D_INFO, new_req, "new request");
1993         RETURN(0);
1994 }
1995
1996 /*
1997  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1998  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1999  * fine for our small page arrays and doesn't require allocation.  its an
2000  * insertion sort that swaps elements that are strides apart, shrinking the
2001  * stride down until its '1' and the array is sorted.
2002  */
2003 static void sort_brw_pages(struct brw_page **array, int num)
2004 {
2005         int stride, i, j;
2006         struct brw_page *tmp;
2007
2008         if (num == 1)
2009                 return;
2010         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2011                 ;
2012
2013         do {
2014                 stride /= 3;
2015                 for (i = stride ; i < num ; i++) {
2016                         tmp = array[i];
2017                         j = i;
2018                         while (j >= stride && array[j - stride]->off > tmp->off) {
2019                                 array[j] = array[j - stride];
2020                                 j -= stride;
2021                         }
2022                         array[j] = tmp;
2023                 }
2024         } while (stride > 1);
2025 }
2026
2027 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2028 {
2029         LASSERT(ppga != NULL);
2030         OBD_FREE(ppga, sizeof(*ppga) * count);
2031 }
2032
2033 static int brw_interpret(const struct lu_env *env,
2034                          struct ptlrpc_request *req, void *args, int rc)
2035 {
2036         struct osc_brw_async_args *aa = args;
2037         struct osc_extent *ext;
2038         struct osc_extent *tmp;
2039         struct client_obd *cli = aa->aa_cli;
2040         unsigned long transferred = 0;
2041
2042         ENTRY;
2043
2044         rc = osc_brw_fini_request(req, rc);
2045         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2046         /*
2047          * When server returns -EINPROGRESS, client should always retry
2048          * regardless of the number of times the bulk was resent already.
2049          */
2050         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2051                 if (req->rq_import_generation !=
2052                     req->rq_import->imp_generation) {
2053                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2054                                ""DOSTID", rc = %d.\n",
2055                                req->rq_import->imp_obd->obd_name,
2056                                POSTID(&aa->aa_oa->o_oi), rc);
2057                 } else if (rc == -EINPROGRESS ||
2058                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2059                         rc = osc_brw_redo_request(req, aa, rc);
2060                 } else {
2061                         CERROR("%s: too many resent retries for object: "
2062                                "%llu:%llu, rc = %d.\n",
2063                                req->rq_import->imp_obd->obd_name,
2064                                POSTID(&aa->aa_oa->o_oi), rc);
2065                 }
2066
2067                 if (rc == 0)
2068                         RETURN(0);
2069                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2070                         rc = -EIO;
2071         }
2072
2073         if (rc == 0) {
2074                 struct obdo *oa = aa->aa_oa;
2075                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2076                 unsigned long valid = 0;
2077                 struct cl_object *obj;
2078                 struct osc_async_page *last;
2079
2080                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2081                 obj = osc2cl(last->oap_obj);
2082
2083                 cl_object_attr_lock(obj);
2084                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2085                         attr->cat_blocks = oa->o_blocks;
2086                         valid |= CAT_BLOCKS;
2087                 }
2088                 if (oa->o_valid & OBD_MD_FLMTIME) {
2089                         attr->cat_mtime = oa->o_mtime;
2090                         valid |= CAT_MTIME;
2091                 }
2092                 if (oa->o_valid & OBD_MD_FLATIME) {
2093                         attr->cat_atime = oa->o_atime;
2094                         valid |= CAT_ATIME;
2095                 }
2096                 if (oa->o_valid & OBD_MD_FLCTIME) {
2097                         attr->cat_ctime = oa->o_ctime;
2098                         valid |= CAT_CTIME;
2099                 }
2100
2101                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2102                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2103                         loff_t last_off = last->oap_count + last->oap_obj_off +
2104                                 last->oap_page_off;
2105
2106                         /* Change file size if this is an out of quota or
2107                          * direct IO write and it extends the file size */
2108                         if (loi->loi_lvb.lvb_size < last_off) {
2109                                 attr->cat_size = last_off;
2110                                 valid |= CAT_SIZE;
2111                         }
2112                         /* Extend KMS if it's not a lockless write */
2113                         if (loi->loi_kms < last_off &&
2114                             oap2osc_page(last)->ops_srvlock == 0) {
2115                                 attr->cat_kms = last_off;
2116                                 valid |= CAT_KMS;
2117                         }
2118                 }
2119
2120                 if (valid != 0)
2121                         cl_object_attr_update(env, obj, attr, valid);
2122                 cl_object_attr_unlock(obj);
2123         }
2124         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2125         aa->aa_oa = NULL;
2126
2127         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2128                 osc_inc_unstable_pages(req);
2129
2130         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2131                 list_del_init(&ext->oe_link);
2132                 osc_extent_finish(env, ext, 1,
2133                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2134         }
2135         LASSERT(list_empty(&aa->aa_exts));
2136         LASSERT(list_empty(&aa->aa_oaps));
2137
2138         transferred = (req->rq_bulk == NULL ? /* short io */
2139                        aa->aa_requested_nob :
2140                        req->rq_bulk->bd_nob_transferred);
2141
2142         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2143         ptlrpc_lprocfs_brw(req, transferred);
2144
2145         spin_lock(&cli->cl_loi_list_lock);
2146         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2147          * is called so we know whether to go to sync BRWs or wait for more
2148          * RPCs to complete */
2149         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2150                 cli->cl_w_in_flight--;
2151         else
2152                 cli->cl_r_in_flight--;
2153         osc_wake_cache_waiters(cli);
2154         spin_unlock(&cli->cl_loi_list_lock);
2155
2156         osc_io_unplug(env, cli, NULL);
2157         RETURN(rc);
2158 }
2159
2160 static void brw_commit(struct ptlrpc_request *req)
2161 {
2162         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2163          * this called via the rq_commit_cb, I need to ensure
2164          * osc_dec_unstable_pages is still called. Otherwise unstable
2165          * pages may be leaked. */
2166         spin_lock(&req->rq_lock);
2167         if (likely(req->rq_unstable)) {
2168                 req->rq_unstable = 0;
2169                 spin_unlock(&req->rq_lock);
2170
2171                 osc_dec_unstable_pages(req);
2172         } else {
2173                 req->rq_committed = 1;
2174                 spin_unlock(&req->rq_lock);
2175         }
2176 }
2177
2178 /**
2179  * Build an RPC by the list of extent @ext_list. The caller must ensure
2180  * that the total pages in this list are NOT over max pages per RPC.
2181  * Extents in the list must be in OES_RPC state.
2182  */
2183 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2184                   struct list_head *ext_list, int cmd)
2185 {
2186         struct ptlrpc_request           *req = NULL;
2187         struct osc_extent               *ext;
2188         struct brw_page                 **pga = NULL;
2189         struct osc_brw_async_args       *aa = NULL;
2190         struct obdo                     *oa = NULL;
2191         struct osc_async_page           *oap;
2192         struct osc_object               *obj = NULL;
2193         struct cl_req_attr              *crattr = NULL;
2194         loff_t                          starting_offset = OBD_OBJECT_EOF;
2195         loff_t                          ending_offset = 0;
2196         int                             mpflag = 0;
2197         int                             mem_tight = 0;
2198         int                             page_count = 0;
2199         bool                            soft_sync = false;
2200         bool                            interrupted = false;
2201         bool                            ndelay = false;
2202         int                             i;
2203         int                             grant = 0;
2204         int                             rc;
2205         __u32                           layout_version = 0;
2206         LIST_HEAD(rpc_list);
2207         struct ost_body                 *body;
2208         ENTRY;
2209         LASSERT(!list_empty(ext_list));
2210
2211         /* add pages into rpc_list to build BRW rpc */
2212         list_for_each_entry(ext, ext_list, oe_link) {
2213                 LASSERT(ext->oe_state == OES_RPC);
2214                 mem_tight |= ext->oe_memalloc;
2215                 grant += ext->oe_grants;
2216                 page_count += ext->oe_nr_pages;
2217                 layout_version = max(layout_version, ext->oe_layout_version);
2218                 if (obj == NULL)
2219                         obj = ext->oe_obj;
2220         }
2221
2222         soft_sync = osc_over_unstable_soft_limit(cli);
2223         if (mem_tight)
2224                 mpflag = cfs_memory_pressure_get_and_set();
2225
2226         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2227         if (pga == NULL)
2228                 GOTO(out, rc = -ENOMEM);
2229
2230         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2231         if (oa == NULL)
2232                 GOTO(out, rc = -ENOMEM);
2233
2234         i = 0;
2235         list_for_each_entry(ext, ext_list, oe_link) {
2236                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2237                         if (mem_tight)
2238                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2239                         if (soft_sync)
2240                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2241                         pga[i] = &oap->oap_brw_page;
2242                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2243                         i++;
2244
2245                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2246                         if (starting_offset == OBD_OBJECT_EOF ||
2247                             starting_offset > oap->oap_obj_off)
2248                                 starting_offset = oap->oap_obj_off;
2249                         else
2250                                 LASSERT(oap->oap_page_off == 0);
2251                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2252                                 ending_offset = oap->oap_obj_off +
2253                                                 oap->oap_count;
2254                         else
2255                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2256                                         PAGE_SIZE);
2257                         if (oap->oap_interrupted)
2258                                 interrupted = true;
2259                 }
2260                 if (ext->oe_ndelay)
2261                         ndelay = true;
2262         }
2263
2264         /* first page in the list */
2265         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2266
2267         crattr = &osc_env_info(env)->oti_req_attr;
2268         memset(crattr, 0, sizeof(*crattr));
2269         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2270         crattr->cra_flags = ~0ULL;
2271         crattr->cra_page = oap2cl_page(oap);
2272         crattr->cra_oa = oa;
2273         cl_req_attr_set(env, osc2cl(obj), crattr);
2274
2275         if (cmd == OBD_BRW_WRITE) {
2276                 oa->o_grant_used = grant;
2277                 if (layout_version > 0) {
2278                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2279                                PFID(&oa->o_oi.oi_fid), layout_version);
2280
2281                         oa->o_layout_version = layout_version;
2282                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2283                 }
2284         }
2285
2286         sort_brw_pages(pga, page_count);
2287         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2288         if (rc != 0) {
2289                 CERROR("prep_req failed: %d\n", rc);
2290                 GOTO(out, rc);
2291         }
2292
2293         req->rq_commit_cb = brw_commit;
2294         req->rq_interpret_reply = brw_interpret;
2295         req->rq_memalloc = mem_tight != 0;
2296         oap->oap_request = ptlrpc_request_addref(req);
2297         if (interrupted && !req->rq_intr)
2298                 ptlrpc_mark_interrupted(req);
2299         if (ndelay) {
2300                 req->rq_no_resend = req->rq_no_delay = 1;
2301                 /* probably set a shorter timeout value.
2302                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2303                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2304         }
2305
2306         /* Need to update the timestamps after the request is built in case
2307          * we race with setattr (locally or in queue at OST).  If OST gets
2308          * later setattr before earlier BRW (as determined by the request xid),
2309          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2310          * way to do this in a single call.  bug 10150 */
2311         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2312         crattr->cra_oa = &body->oa;
2313         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2314         cl_req_attr_set(env, osc2cl(obj), crattr);
2315         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2316
2317         aa = ptlrpc_req_async_args(aa, req);
2318         INIT_LIST_HEAD(&aa->aa_oaps);
2319         list_splice_init(&rpc_list, &aa->aa_oaps);
2320         INIT_LIST_HEAD(&aa->aa_exts);
2321         list_splice_init(ext_list, &aa->aa_exts);
2322
2323         spin_lock(&cli->cl_loi_list_lock);
2324         starting_offset >>= PAGE_SHIFT;
2325         if (cmd == OBD_BRW_READ) {
2326                 cli->cl_r_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2330                                       starting_offset + 1);
2331         } else {
2332                 cli->cl_w_in_flight++;
2333                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2334                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2335                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2336                                       starting_offset + 1);
2337         }
2338         spin_unlock(&cli->cl_loi_list_lock);
2339
2340         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2341                   page_count, aa, cli->cl_r_in_flight,
2342                   cli->cl_w_in_flight);
2343         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2344
2345         ptlrpcd_add_req(req);
2346         rc = 0;
2347         EXIT;
2348
2349 out:
2350         if (mem_tight != 0)
2351                 cfs_memory_pressure_restore(mpflag);
2352
2353         if (rc != 0) {
2354                 LASSERT(req == NULL);
2355
2356                 if (oa)
2357                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2358                 if (pga)
2359                         OBD_FREE(pga, sizeof(*pga) * page_count);
2360                 /* this should happen rarely and is pretty bad, it makes the
2361                  * pending list not follow the dirty order */
2362                 while (!list_empty(ext_list)) {
2363                         ext = list_entry(ext_list->next, struct osc_extent,
2364                                          oe_link);
2365                         list_del_init(&ext->oe_link);
2366                         osc_extent_finish(env, ext, 0, rc);
2367                 }
2368         }
2369         RETURN(rc);
2370 }
2371
2372 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2373 {
2374         int set = 0;
2375
2376         LASSERT(lock != NULL);
2377
2378         lock_res_and_lock(lock);
2379
2380         if (lock->l_ast_data == NULL)
2381                 lock->l_ast_data = data;
2382         if (lock->l_ast_data == data)
2383                 set = 1;
2384
2385         unlock_res_and_lock(lock);
2386
2387         return set;
2388 }
2389
2390 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2391                      void *cookie, struct lustre_handle *lockh,
2392                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2393                      int errcode)
2394 {
2395         bool intent = *flags & LDLM_FL_HAS_INTENT;
2396         int rc;
2397         ENTRY;
2398
2399         /* The request was created before ldlm_cli_enqueue call. */
2400         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2401                 struct ldlm_reply *rep;
2402
2403                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2404                 LASSERT(rep != NULL);
2405
2406                 rep->lock_policy_res1 =
2407                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2408                 if (rep->lock_policy_res1)
2409                         errcode = rep->lock_policy_res1;
2410                 if (!speculative)
2411                         *flags |= LDLM_FL_LVB_READY;
2412         } else if (errcode == ELDLM_OK) {
2413                 *flags |= LDLM_FL_LVB_READY;
2414         }
2415
2416         /* Call the update callback. */
2417         rc = (*upcall)(cookie, lockh, errcode);
2418
2419         /* release the reference taken in ldlm_cli_enqueue() */
2420         if (errcode == ELDLM_LOCK_MATCHED)
2421                 errcode = ELDLM_OK;
2422         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2423                 ldlm_lock_decref(lockh, mode);
2424
2425         RETURN(rc);
2426 }
2427
2428 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2429                           void *args, int rc)
2430 {
2431         struct osc_enqueue_args *aa = args;
2432         struct ldlm_lock *lock;
2433         struct lustre_handle *lockh = &aa->oa_lockh;
2434         enum ldlm_mode mode = aa->oa_mode;
2435         struct ost_lvb *lvb = aa->oa_lvb;
2436         __u32 lvb_len = sizeof(*lvb);
2437         __u64 flags = 0;
2438
2439         ENTRY;
2440
2441         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2442          * be valid. */
2443         lock = ldlm_handle2lock(lockh);
2444         LASSERTF(lock != NULL,
2445                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2446                  lockh->cookie, req, aa);
2447
2448         /* Take an additional reference so that a blocking AST that
2449          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2450          * to arrive after an upcall has been executed by
2451          * osc_enqueue_fini(). */
2452         ldlm_lock_addref(lockh, mode);
2453
2454         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2456
2457         /* Let CP AST to grant the lock first. */
2458         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2459
2460         if (aa->oa_speculative) {
2461                 LASSERT(aa->oa_lvb == NULL);
2462                 LASSERT(aa->oa_flags == NULL);
2463                 aa->oa_flags = &flags;
2464         }
2465
2466         /* Complete obtaining the lock procedure. */
2467         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2468                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2469                                    lockh, rc);
2470         /* Complete osc stuff. */
2471         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2472                               aa->oa_flags, aa->oa_speculative, rc);
2473
2474         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2475
2476         ldlm_lock_decref(lockh, mode);
2477         LDLM_LOCK_PUT(lock);
2478         RETURN(rc);
2479 }
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is evicted from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, union ldlm_policy_data *policy,
2492                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2493                      void *cookie, struct ldlm_enqueue_info *einfo,
2494                      struct ptlrpc_request_set *rqset, int async,
2495                      bool speculative)
2496 {
2497         struct obd_device *obd = exp->exp_obd;
2498         struct lustre_handle lockh = { 0 };
2499         struct ptlrpc_request *req = NULL;
2500         int intent = *flags & LDLM_FL_HAS_INTENT;
2501         __u64 match_flags = *flags;
2502         enum ldlm_mode mode;
2503         int rc;
2504         ENTRY;
2505
2506         /* Filesystem lock extents are extended to page boundaries so that
2507          * dealing with the page cache is a little smoother.  */
2508         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2509         policy->l_extent.end |= ~PAGE_MASK;
2510
2511         /* Next, search for already existing extent locks that will cover us */
2512         /* If we're trying to read, we also search for an existing PW lock.  The
2513          * VFS and page cache already protect us locally, so lots of readers/
2514          * writers can share a single PW lock.
2515          *
2516          * There are problems with conversion deadlocks, so instead of
2517          * converting a read lock to a write lock, we'll just enqueue a new
2518          * one.
2519          *
2520          * At some point we should cancel the read lock instead of making them
2521          * send us a blocking callback, but there are problems with canceling
2522          * locks out from other users right now, too. */
2523         mode = einfo->ei_mode;
2524         if (einfo->ei_mode == LCK_PR)
2525                 mode |= LCK_PW;
2526         /* Normal lock requests must wait for the LVB to be ready before
2527          * matching a lock; speculative lock requests do not need to,
2528          * because they will not actually use the lock. */
2529         if (!speculative)
2530                 match_flags |= LDLM_FL_LVB_READY;
2531         if (intent != 0)
2532                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2533         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2534                                einfo->ei_type, policy, mode, &lockh, 0);
2535         if (mode) {
2536                 struct ldlm_lock *matched;
2537
2538                 if (*flags & LDLM_FL_TEST_LOCK)
2539                         RETURN(ELDLM_OK);
2540
2541                 matched = ldlm_handle2lock(&lockh);
2542                 if (speculative) {
2543                         /* This DLM lock request is speculative, and does not
2544                          * have an associated IO request. Therefore if there
2545                          * is already a DLM lock, it wll just inform the
2546                          * caller to cancel the request for this stripe.*/
2547                         lock_res_and_lock(matched);
2548                         if (ldlm_extent_equal(&policy->l_extent,
2549                             &matched->l_policy_data.l_extent))
2550                                 rc = -EEXIST;
2551                         else
2552                                 rc = -ECANCELED;
2553                         unlock_res_and_lock(matched);
2554
2555                         ldlm_lock_decref(&lockh, mode);
2556                         LDLM_LOCK_PUT(matched);
2557                         RETURN(rc);
2558                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2559                         *flags |= LDLM_FL_LVB_READY;
2560
2561                         /* We already have a lock, and it's referenced. */
2562                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2563
2564                         ldlm_lock_decref(&lockh, mode);
2565                         LDLM_LOCK_PUT(matched);
2566                         RETURN(ELDLM_OK);
2567                 } else {
2568                         ldlm_lock_decref(&lockh, mode);
2569                         LDLM_LOCK_PUT(matched);
2570                 }
2571         }
2572
2573         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2574                 RETURN(-ENOLCK);
2575
2576         if (intent) {
2577                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2578                                            &RQF_LDLM_ENQUEUE_LVB);
2579                 if (req == NULL)
2580                         RETURN(-ENOMEM);
2581
2582                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2583                 if (rc) {
2584                         ptlrpc_request_free(req);
2585                         RETURN(rc);
2586                 }
2587
2588                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2589                                      sizeof *lvb);
2590                 ptlrpc_request_set_replen(req);
2591         }
2592
2593         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2594         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2595
2596         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2597                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2598         if (async) {
2599                 if (!rc) {
2600                         struct osc_enqueue_args *aa;
2601                         aa = ptlrpc_req_async_args(aa, req);
2602                         aa->oa_exp         = exp;
2603                         aa->oa_mode        = einfo->ei_mode;
2604                         aa->oa_type        = einfo->ei_type;
2605                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2606                         aa->oa_upcall      = upcall;
2607                         aa->oa_cookie      = cookie;
2608                         aa->oa_speculative = speculative;
2609                         if (!speculative) {
2610                                 aa->oa_flags  = flags;
2611                                 aa->oa_lvb    = lvb;
2612                         } else {
2613                                 /* speculative locks are essentially to enqueue
2614                                  * a DLM lock  in advance, so we don't care
2615                                  * about the result of the enqueue. */
2616                                 aa->oa_lvb    = NULL;
2617                                 aa->oa_flags  = NULL;
2618                         }
2619
2620                         req->rq_interpret_reply = osc_enqueue_interpret;
2621                         if (rqset == PTLRPCD_SET)
2622                                 ptlrpcd_add_req(req);
2623                         else
2624                                 ptlrpc_set_add_req(rqset, req);
2625                 } else if (intent) {
2626                         ptlrpc_req_finished(req);
2627                 }
2628                 RETURN(rc);
2629         }
2630
2631         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2632                               flags, speculative, rc);
2633         if (intent)
2634                 ptlrpc_req_finished(req);
2635
2636         RETURN(rc);
2637 }
2638
2639 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2640                    struct ldlm_res_id *res_id, enum ldlm_type type,
2641                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2642                    __u64 *flags, struct osc_object *obj,
2643                    struct lustre_handle *lockh, int unref)
2644 {
2645         struct obd_device *obd = exp->exp_obd;
2646         __u64 lflags = *flags;
2647         enum ldlm_mode rc;
2648         ENTRY;
2649
2650         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2651                 RETURN(-EIO);
2652
2653         /* Filesystem lock extents are extended to page boundaries so that
2654          * dealing with the page cache is a little smoother */
2655         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2656         policy->l_extent.end |= ~PAGE_MASK;
2657
2658         /* Next, search for already existing extent locks that will cover us */
2659         /* If we're trying to read, we also search for an existing PW lock.  The
2660          * VFS and page cache already protect us locally, so lots of readers/
2661          * writers can share a single PW lock. */
2662         rc = mode;
2663         if (mode == LCK_PR)
2664                 rc |= LCK_PW;
2665         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2666                              res_id, type, policy, rc, lockh, unref);
2667         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2668                 RETURN(rc);
2669
2670         if (obj != NULL) {
2671                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2672
2673                 LASSERT(lock != NULL);
2674                 if (osc_set_lock_data(lock, obj)) {
2675                         lock_res_and_lock(lock);
2676                         if (!ldlm_is_lvb_cached(lock)) {
2677                                 LASSERT(lock->l_ast_data == obj);
2678                                 osc_lock_lvb_update(env, obj, lock, NULL);
2679                                 ldlm_set_lvb_cached(lock);
2680                         }
2681                         unlock_res_and_lock(lock);
2682                 } else {
2683                         ldlm_lock_decref(lockh, rc);
2684                         rc = 0;
2685                 }
2686                 LDLM_LOCK_PUT(lock);
2687         }
2688         RETURN(rc);
2689 }
2690
2691 static int osc_statfs_interpret(const struct lu_env *env,
2692                                 struct ptlrpc_request *req, void *args, int rc)
2693 {
2694         struct osc_async_args *aa = args;
2695         struct obd_statfs *msfs;
2696
2697         ENTRY;
2698         if (rc == -EBADR)
2699                 /*
2700                  * The request has in fact never been sent due to issues at
2701                  * a higher level (LOV).  Exit immediately since the caller
2702                  * is aware of the problem and takes care of the clean up.
2703                  */
2704                 RETURN(rc);
2705
2706         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2707             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2708                 GOTO(out, rc = 0);
2709
2710         if (rc != 0)
2711                 GOTO(out, rc);
2712
2713         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2714         if (msfs == NULL)
2715                 GOTO(out, rc = -EPROTO);
2716
2717         *aa->aa_oi->oi_osfs = *msfs;
2718 out:
2719         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2720
2721         RETURN(rc);
2722 }
2723
2724 static int osc_statfs_async(struct obd_export *exp,
2725                             struct obd_info *oinfo, time64_t max_age,
2726                             struct ptlrpc_request_set *rqset)
2727 {
2728         struct obd_device     *obd = class_exp2obd(exp);
2729         struct ptlrpc_request *req;
2730         struct osc_async_args *aa;
2731         int rc;
2732         ENTRY;
2733
2734         if (obd->obd_osfs_age >= max_age) {
2735                 CDEBUG(D_SUPER,
2736                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2737                        obd->obd_name, &obd->obd_osfs,
2738                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2739                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2740                 spin_lock(&obd->obd_osfs_lock);
2741                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2742                 spin_unlock(&obd->obd_osfs_lock);
2743                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2744                 if (oinfo->oi_cb_up)
2745                         oinfo->oi_cb_up(oinfo, 0);
2746
2747                 RETURN(0);
2748         }
2749
2750         /* We could possibly pass max_age in the request (as an absolute
2751          * timestamp or a "seconds.usec ago") so the target can avoid doing
2752          * extra calls into the filesystem if that isn't necessary (e.g.
2753          * during mount that would help a bit).  Having relative timestamps
2754          * is not so great if request processing is slow, while absolute
2755          * timestamps are not ideal because they need time synchronization. */
2756         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2757         if (req == NULL)
2758                 RETURN(-ENOMEM);
2759
2760         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2761         if (rc) {
2762                 ptlrpc_request_free(req);
2763                 RETURN(rc);
2764         }
2765         ptlrpc_request_set_replen(req);
2766         req->rq_request_portal = OST_CREATE_PORTAL;
2767         ptlrpc_at_set_req_timeout(req);
2768
2769         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2770                 /* procfs requests not want stat in wait for avoid deadlock */
2771                 req->rq_no_resend = 1;
2772                 req->rq_no_delay = 1;
2773         }
2774
2775         req->rq_interpret_reply = osc_statfs_interpret;
2776         aa = ptlrpc_req_async_args(aa, req);
2777         aa->aa_oi = oinfo;
2778
2779         ptlrpc_set_add_req(rqset, req);
2780         RETURN(0);
2781 }
2782
2783 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2784                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2785 {
2786         struct obd_device     *obd = class_exp2obd(exp);
2787         struct obd_statfs     *msfs;
2788         struct ptlrpc_request *req;
2789         struct obd_import     *imp = NULL;
2790         int rc;
2791         ENTRY;
2792
2793
2794         /*Since the request might also come from lprocfs, so we need
2795          *sync this with client_disconnect_export Bug15684*/
2796         down_read(&obd->u.cli.cl_sem);
2797         if (obd->u.cli.cl_import)
2798                 imp = class_import_get(obd->u.cli.cl_import);
2799         up_read(&obd->u.cli.cl_sem);
2800         if (!imp)
2801                 RETURN(-ENODEV);
2802
2803         /* We could possibly pass max_age in the request (as an absolute
2804          * timestamp or a "seconds.usec ago") so the target can avoid doing
2805          * extra calls into the filesystem if that isn't necessary (e.g.
2806          * during mount that would help a bit).  Having relative timestamps
2807          * is not so great if request processing is slow, while absolute
2808          * timestamps are not ideal because they need time synchronization. */
2809         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2810
2811         class_import_put(imp);
2812
2813         if (req == NULL)
2814                 RETURN(-ENOMEM);
2815
2816         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2817         if (rc) {
2818                 ptlrpc_request_free(req);
2819                 RETURN(rc);
2820         }
2821         ptlrpc_request_set_replen(req);
2822         req->rq_request_portal = OST_CREATE_PORTAL;
2823         ptlrpc_at_set_req_timeout(req);
2824
2825         if (flags & OBD_STATFS_NODELAY) {
2826                 /* procfs requests not want stat in wait for avoid deadlock */
2827                 req->rq_no_resend = 1;
2828                 req->rq_no_delay = 1;
2829         }
2830
2831         rc = ptlrpc_queue_wait(req);
2832         if (rc)
2833                 GOTO(out, rc);
2834
2835         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2836         if (msfs == NULL)
2837                 GOTO(out, rc = -EPROTO);
2838
2839         *osfs = *msfs;
2840
2841         EXIT;
2842 out:
2843         ptlrpc_req_finished(req);
2844         return rc;
2845 }
2846
2847 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2848                          void *karg, void __user *uarg)
2849 {
2850         struct obd_device *obd = exp->exp_obd;
2851         struct obd_ioctl_data *data = karg;
2852         int rc = 0;
2853
2854         ENTRY;
2855         if (!try_module_get(THIS_MODULE)) {
2856                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2857                        module_name(THIS_MODULE));
2858                 return -EINVAL;
2859         }
2860         switch (cmd) {
2861         case OBD_IOC_CLIENT_RECOVER:
2862                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2863                                            data->ioc_inlbuf1, 0);
2864                 if (rc > 0)
2865                         rc = 0;
2866                 break;
2867         case IOC_OSC_SET_ACTIVE:
2868                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2869                                               data->ioc_offset);
2870                 break;
2871         default:
2872                 rc = -ENOTTY;
2873                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2874                        obd->obd_name, cmd, current_comm(), rc);
2875                 break;
2876         }
2877
2878         module_put(THIS_MODULE);
2879         return rc;
2880 }
2881
2882 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2883                        u32 keylen, void *key, u32 vallen, void *val,
2884                        struct ptlrpc_request_set *set)
2885 {
2886         struct ptlrpc_request *req;
2887         struct obd_device     *obd = exp->exp_obd;
2888         struct obd_import     *imp = class_exp2cliimp(exp);
2889         char                  *tmp;
2890         int                    rc;
2891         ENTRY;
2892
2893         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2894
2895         if (KEY_IS(KEY_CHECKSUM)) {
2896                 if (vallen != sizeof(int))
2897                         RETURN(-EINVAL);
2898                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2899                 RETURN(0);
2900         }
2901
2902         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2903                 sptlrpc_conf_client_adapt(obd);
2904                 RETURN(0);
2905         }
2906
2907         if (KEY_IS(KEY_FLUSH_CTX)) {
2908                 sptlrpc_import_flush_my_ctx(imp);
2909                 RETURN(0);
2910         }
2911
2912         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2913                 struct client_obd *cli = &obd->u.cli;
2914                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2915                 long target = *(long *)val;
2916
2917                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2918                 *(long *)val -= nr;
2919                 RETURN(0);
2920         }
2921
2922         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2923                 RETURN(-EINVAL);
2924
2925         /* We pass all other commands directly to OST. Since nobody calls osc
2926            methods directly and everybody is supposed to go through LOV, we
2927            assume lov checked invalid values for us.
2928            The only recognised values so far are evict_by_nid and mds_conn.
2929            Even if something bad goes through, we'd get a -EINVAL from OST
2930            anyway. */
2931
2932         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2933                                                 &RQF_OST_SET_GRANT_INFO :
2934                                                 &RQF_OBD_SET_INFO);
2935         if (req == NULL)
2936                 RETURN(-ENOMEM);
2937
2938         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2939                              RCL_CLIENT, keylen);
2940         if (!KEY_IS(KEY_GRANT_SHRINK))
2941                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2942                                      RCL_CLIENT, vallen);
2943         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2944         if (rc) {
2945                 ptlrpc_request_free(req);
2946                 RETURN(rc);
2947         }
2948
2949         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2950         memcpy(tmp, key, keylen);
2951         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2952                                                         &RMF_OST_BODY :
2953                                                         &RMF_SETINFO_VAL);
2954         memcpy(tmp, val, vallen);
2955
2956         if (KEY_IS(KEY_GRANT_SHRINK)) {
2957                 struct osc_grant_args *aa;
2958                 struct obdo *oa;
2959
2960                 aa = ptlrpc_req_async_args(aa, req);
2961                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2962                 if (!oa) {
2963                         ptlrpc_req_finished(req);
2964                         RETURN(-ENOMEM);
2965                 }
2966                 *oa = ((struct ost_body *)val)->oa;
2967                 aa->aa_oa = oa;
2968                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2969         }
2970
2971         ptlrpc_request_set_replen(req);
2972         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2973                 LASSERT(set != NULL);
2974                 ptlrpc_set_add_req(set, req);
2975                 ptlrpc_check_set(NULL, set);
2976         } else {
2977                 ptlrpcd_add_req(req);
2978         }
2979
2980         RETURN(0);
2981 }
2982 EXPORT_SYMBOL(osc_set_info_async);
2983
2984 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2985                   struct obd_device *obd, struct obd_uuid *cluuid,
2986                   struct obd_connect_data *data, void *localdata)
2987 {
2988         struct client_obd *cli = &obd->u.cli;
2989
2990         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2991                 long lost_grant;
2992                 long grant;
2993
2994                 spin_lock(&cli->cl_loi_list_lock);
2995                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2996                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2997                         /* restore ocd_grant_blkbits as client page bits */
2998                         data->ocd_grant_blkbits = PAGE_SHIFT;
2999                         grant += cli->cl_dirty_grant;
3000                 } else {
3001                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3002                 }
3003                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3004                 lost_grant = cli->cl_lost_grant;
3005                 cli->cl_lost_grant = 0;
3006                 spin_unlock(&cli->cl_loi_list_lock);
3007
3008                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3009                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3010                        data->ocd_version, data->ocd_grant, lost_grant);
3011         }
3012
3013         RETURN(0);
3014 }
3015 EXPORT_SYMBOL(osc_reconnect);
3016
3017 int osc_disconnect(struct obd_export *exp)
3018 {
3019         struct obd_device *obd = class_exp2obd(exp);
3020         int rc;
3021
3022         rc = client_disconnect_export(exp);
3023         /**
3024          * Initially we put del_shrink_grant before disconnect_export, but it
3025          * causes the following problem if setup (connect) and cleanup
3026          * (disconnect) are tangled together.
3027          *      connect p1                     disconnect p2
3028          *   ptlrpc_connect_import
3029          *     ...............               class_manual_cleanup
3030          *                                     osc_disconnect
3031          *                                     del_shrink_grant
3032          *   ptlrpc_connect_interrupt
3033          *     osc_init_grant
3034          *   add this client to shrink list
3035          *                                      cleanup_osc
3036          * Bang! grant shrink thread trigger the shrink. BUG18662
3037          */
3038         osc_del_grant_list(&obd->u.cli);
3039         return rc;
3040 }
3041 EXPORT_SYMBOL(osc_disconnect);
3042
3043 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3044                                  struct hlist_node *hnode, void *arg)
3045 {
3046         struct lu_env *env = arg;
3047         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3048         struct ldlm_lock *lock;
3049         struct osc_object *osc = NULL;
3050         ENTRY;
3051
3052         lock_res(res);
3053         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3054                 if (lock->l_ast_data != NULL && osc == NULL) {
3055                         osc = lock->l_ast_data;
3056                         cl_object_get(osc2cl(osc));
3057                 }
3058
3059                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3060                  * by the 2nd round of ldlm_namespace_clean() call in
3061                  * osc_import_event(). */
3062                 ldlm_clear_cleaned(lock);
3063         }
3064         unlock_res(res);
3065
3066         if (osc != NULL) {
3067                 osc_object_invalidate(env, osc);
3068                 cl_object_put(env, osc2cl(osc));
3069         }
3070
3071         RETURN(0);
3072 }
3073 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3074
3075 static int osc_import_event(struct obd_device *obd,
3076                             struct obd_import *imp,
3077                             enum obd_import_event event)
3078 {
3079         struct client_obd *cli;
3080         int rc = 0;
3081
3082         ENTRY;
3083         LASSERT(imp->imp_obd == obd);
3084
3085         switch (event) {
3086         case IMP_EVENT_DISCON: {
3087                 cli = &obd->u.cli;
3088                 spin_lock(&cli->cl_loi_list_lock);
3089                 cli->cl_avail_grant = 0;
3090                 cli->cl_lost_grant = 0;
3091                 spin_unlock(&cli->cl_loi_list_lock);
3092                 break;
3093         }
3094         case IMP_EVENT_INACTIVE: {
3095                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3096                 break;
3097         }
3098         case IMP_EVENT_INVALIDATE: {
3099                 struct ldlm_namespace *ns = obd->obd_namespace;
3100                 struct lu_env         *env;
3101                 __u16                  refcheck;
3102
3103                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3104
3105                 env = cl_env_get(&refcheck);
3106                 if (!IS_ERR(env)) {
3107                         osc_io_unplug(env, &obd->u.cli, NULL);
3108
3109                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3110                                                  osc_ldlm_resource_invalidate,
3111                                                  env, 0);
3112                         cl_env_put(env, &refcheck);
3113
3114                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3115                 } else
3116                         rc = PTR_ERR(env);
3117                 break;
3118         }
3119         case IMP_EVENT_ACTIVE: {
3120                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3121                 break;
3122         }
3123         case IMP_EVENT_OCD: {
3124                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3125
3126                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3127                         osc_init_grant(&obd->u.cli, ocd);
3128
3129                 /* See bug 7198 */
3130                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3131                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3132
3133                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3134                 break;
3135         }
3136         case IMP_EVENT_DEACTIVATE: {
3137                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3138                 break;
3139         }
3140         case IMP_EVENT_ACTIVATE: {
3141                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3142                 break;
3143         }
3144         default:
3145                 CERROR("Unknown import event %d\n", event);
3146                 LBUG();
3147         }
3148         RETURN(rc);
3149 }
3150
3151 /**
3152  * Determine whether the lock can be canceled before replaying the lock
3153  * during recovery, see bug16774 for detailed information.
3154  *
3155  * \retval zero the lock can't be canceled
3156  * \retval other ok to cancel
3157  */
3158 static int osc_cancel_weight(struct ldlm_lock *lock)
3159 {
3160         /*
3161          * Cancel all unused and granted extent lock.
3162          */
3163         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3164             ldlm_is_granted(lock) &&
3165             osc_ldlm_weigh_ast(lock) == 0)
3166                 RETURN(1);
3167
3168         RETURN(0);
3169 }
3170
3171 static int brw_queue_work(const struct lu_env *env, void *data)
3172 {
3173         struct client_obd *cli = data;
3174
3175         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3176
3177         osc_io_unplug(env, cli, NULL);
3178         RETURN(0);
3179 }
3180
3181 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3182 {
3183         struct client_obd *cli = &obd->u.cli;
3184         void *handler;
3185         int rc;
3186
3187         ENTRY;
3188
3189         rc = ptlrpcd_addref();
3190         if (rc)
3191                 RETURN(rc);
3192
3193         rc = client_obd_setup(obd, lcfg);
3194         if (rc)
3195                 GOTO(out_ptlrpcd, rc);
3196
3197
3198         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3199         if (IS_ERR(handler))
3200                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3201         cli->cl_writeback_work = handler;
3202
3203         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3204         if (IS_ERR(handler))
3205                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3206         cli->cl_lru_work = handler;
3207
3208         rc = osc_quota_setup(obd);
3209         if (rc)
3210                 GOTO(out_ptlrpcd_work, rc);
3211
3212         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3213         osc_update_next_shrink(cli);
3214
3215         RETURN(rc);
3216
3217 out_ptlrpcd_work:
3218         if (cli->cl_writeback_work != NULL) {
3219                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3220                 cli->cl_writeback_work = NULL;
3221         }
3222         if (cli->cl_lru_work != NULL) {
3223                 ptlrpcd_destroy_work(cli->cl_lru_work);
3224                 cli->cl_lru_work = NULL;
3225         }
3226         client_obd_cleanup(obd);
3227 out_ptlrpcd:
3228         ptlrpcd_decref();
3229         RETURN(rc);
3230 }
3231 EXPORT_SYMBOL(osc_setup_common);
3232
3233 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3234 {
3235         struct client_obd *cli = &obd->u.cli;
3236         int                adding;
3237         int                added;
3238         int                req_count;
3239         int                rc;
3240
3241         ENTRY;
3242
3243         rc = osc_setup_common(obd, lcfg);
3244         if (rc < 0)
3245                 RETURN(rc);
3246
3247         rc = osc_tunables_init(obd);
3248         if (rc)
3249                 RETURN(rc);
3250
3251         /*
3252          * We try to control the total number of requests with a upper limit
3253          * osc_reqpool_maxreqcount. There might be some race which will cause
3254          * over-limit allocation, but it is fine.
3255          */
3256         req_count = atomic_read(&osc_pool_req_count);
3257         if (req_count < osc_reqpool_maxreqcount) {
3258                 adding = cli->cl_max_rpcs_in_flight + 2;
3259                 if (req_count + adding > osc_reqpool_maxreqcount)
3260                         adding = osc_reqpool_maxreqcount - req_count;
3261
3262                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3263                 atomic_add(added, &osc_pool_req_count);
3264         }
3265
3266         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3267
3268         spin_lock(&osc_shrink_lock);
3269         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3270         spin_unlock(&osc_shrink_lock);
3271         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3272         cli->cl_import->imp_idle_debug = D_HA;
3273
3274         RETURN(0);
3275 }
3276
3277 int osc_precleanup_common(struct obd_device *obd)
3278 {
3279         struct client_obd *cli = &obd->u.cli;
3280         ENTRY;
3281
3282         /* LU-464
3283          * for echo client, export may be on zombie list, wait for
3284          * zombie thread to cull it, because cli.cl_import will be
3285          * cleared in client_disconnect_export():
3286          *   class_export_destroy() -> obd_cleanup() ->
3287          *   echo_device_free() -> echo_client_cleanup() ->
3288          *   obd_disconnect() -> osc_disconnect() ->
3289          *   client_disconnect_export()
3290          */
3291         obd_zombie_barrier();
3292         if (cli->cl_writeback_work) {
3293                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3294                 cli->cl_writeback_work = NULL;
3295         }
3296
3297         if (cli->cl_lru_work) {
3298                 ptlrpcd_destroy_work(cli->cl_lru_work);
3299                 cli->cl_lru_work = NULL;
3300         }
3301
3302         obd_cleanup_client_import(obd);
3303         RETURN(0);
3304 }
3305 EXPORT_SYMBOL(osc_precleanup_common);
3306
3307 static int osc_precleanup(struct obd_device *obd)
3308 {
3309         ENTRY;
3310
3311         osc_precleanup_common(obd);
3312
3313         ptlrpc_lprocfs_unregister_obd(obd);
3314         RETURN(0);
3315 }
3316
3317 int osc_cleanup_common(struct obd_device *obd)
3318 {
3319         struct client_obd *cli = &obd->u.cli;
3320         int rc;
3321
3322         ENTRY;
3323
3324         spin_lock(&osc_shrink_lock);
3325         list_del(&cli->cl_shrink_list);
3326         spin_unlock(&osc_shrink_lock);
3327
3328         /* lru cleanup */
3329         if (cli->cl_cache != NULL) {
3330                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3331                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3332                 list_del_init(&cli->cl_lru_osc);
3333                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3334                 cli->cl_lru_left = NULL;
3335                 cl_cache_decref(cli->cl_cache);
3336                 cli->cl_cache = NULL;
3337         }
3338
3339         /* free memory of osc quota cache */
3340         osc_quota_cleanup(obd);
3341
3342         rc = client_obd_cleanup(obd);
3343
3344         ptlrpcd_decref();
3345         RETURN(rc);
3346 }
3347 EXPORT_SYMBOL(osc_cleanup_common);
3348
3349 static const struct obd_ops osc_obd_ops = {
3350         .o_owner                = THIS_MODULE,
3351         .o_setup                = osc_setup,
3352         .o_precleanup           = osc_precleanup,
3353         .o_cleanup              = osc_cleanup_common,
3354         .o_add_conn             = client_import_add_conn,
3355         .o_del_conn             = client_import_del_conn,
3356         .o_connect              = client_connect_import,
3357         .o_reconnect            = osc_reconnect,
3358         .o_disconnect           = osc_disconnect,
3359         .o_statfs               = osc_statfs,
3360         .o_statfs_async         = osc_statfs_async,
3361         .o_create               = osc_create,
3362         .o_destroy              = osc_destroy,
3363         .o_getattr              = osc_getattr,
3364         .o_setattr              = osc_setattr,
3365         .o_iocontrol            = osc_iocontrol,
3366         .o_set_info_async       = osc_set_info_async,
3367         .o_import_event         = osc_import_event,
3368         .o_quotactl             = osc_quotactl,
3369 };
3370
3371 static struct shrinker *osc_cache_shrinker;
3372 LIST_HEAD(osc_shrink_list);
3373 DEFINE_SPINLOCK(osc_shrink_lock);
3374
3375 #ifndef HAVE_SHRINKER_COUNT
3376 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3377 {
3378         struct shrink_control scv = {
3379                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3380                 .gfp_mask   = shrink_param(sc, gfp_mask)
3381         };
3382         (void)osc_cache_shrink_scan(shrinker, &scv);
3383
3384         return osc_cache_shrink_count(shrinker, &scv);
3385 }
3386 #endif
3387
3388 static int __init osc_init(void)
3389 {
3390         unsigned int reqpool_size;
3391         unsigned int reqsize;
3392         int rc;
3393         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3394                          osc_cache_shrink_count, osc_cache_shrink_scan);
3395         ENTRY;
3396
3397         /* print an address of _any_ initialized kernel symbol from this
3398          * module, to allow debugging with gdb that doesn't support data
3399          * symbols from modules.*/
3400         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3401
3402         rc = lu_kmem_init(osc_caches);
3403         if (rc)
3404                 RETURN(rc);
3405
3406         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3407                                  LUSTRE_OSC_NAME, &osc_device_type);
3408         if (rc)
3409                 GOTO(out_kmem, rc);
3410
3411         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3412
3413         /* This is obviously too much memory, only prevent overflow here */
3414         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3415                 GOTO(out_type, rc = -EINVAL);
3416
3417         reqpool_size = osc_reqpool_mem_max << 20;
3418
3419         reqsize = 1;
3420         while (reqsize < OST_IO_MAXREQSIZE)
3421                 reqsize = reqsize << 1;
3422
3423         /*
3424          * We don't enlarge the request count in OSC pool according to
3425          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3426          * tried after normal allocation failed. So a small OSC pool won't
3427          * cause much performance degression in most of cases.
3428          */
3429         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3430
3431         atomic_set(&osc_pool_req_count, 0);
3432         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3433                                           ptlrpc_add_rqs_to_pool);
3434
3435         if (osc_rq_pool == NULL)
3436                 GOTO(out_type, rc = -ENOMEM);
3437
3438         rc = osc_start_grant_work();
3439         if (rc != 0)
3440                 GOTO(out_req_pool, rc);
3441
3442         RETURN(rc);
3443
3444 out_req_pool:
3445         ptlrpc_free_rq_pool(osc_rq_pool);
3446 out_type:
3447         class_unregister_type(LUSTRE_OSC_NAME);
3448 out_kmem:
3449         lu_kmem_fini(osc_caches);
3450
3451         RETURN(rc);
3452 }
3453
3454 static void __exit osc_exit(void)
3455 {
3456         osc_stop_grant_work();
3457         remove_shrinker(osc_cache_shrinker);
3458         class_unregister_type(LUSTRE_OSC_NAME);
3459         lu_kmem_fini(osc_caches);
3460         ptlrpc_free_rq_pool(osc_rq_pool);
3461 }
3462
3463 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3464 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3465 MODULE_VERSION(LUSTRE_VERSION_STRING);
3466 MODULE_LICENSE("GPL");
3467
3468 module_init(osc_init);
3469 module_exit(osc_exit);