Whamcloud - gitweb
LU-12368 ptlrpc: make DEBUG_REQ messages consistent
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct list_head       cancels = LIST_HEAD_INIT(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
649                      cli->cl_dirty_max_pages)) {
650                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
651                        cli->cl_dirty_pages, cli->cl_dirty_transit,
652                        cli->cl_dirty_max_pages);
653                 oa->o_undirty = 0;
654         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
655                             atomic_long_read(&obd_dirty_transit_pages) >
656                             (long)(obd_max_dirty_pages + 1))) {
657                 /* The atomic_read() allowing the atomic_inc() are
658                  * not covered by a lock thus they may safely race and trip
659                  * this CERROR() unless we add in a small fudge factor (+1). */
660                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
661                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
662                        atomic_long_read(&obd_dirty_transit_pages),
663                        obd_max_dirty_pages);
664                 oa->o_undirty = 0;
665         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
666                             0x7fffffff)) {
667                 CERROR("dirty %lu - dirty_max %lu too big???\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else {
671                 unsigned long nrpages;
672                 unsigned long undirty;
673
674                 nrpages = cli->cl_max_pages_per_rpc;
675                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
676                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
677                 undirty = nrpages << PAGE_SHIFT;
678                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
679                                  GRANT_PARAM)) {
680                         int nrextents;
681
682                         /* take extent tax into account when asking for more
683                          * grant space */
684                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
685                                      cli->cl_max_extent_pages;
686                         undirty += nrextents * cli->cl_grant_extent_tax;
687                 }
688                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
689                  * to add extent tax, etc.
690                  */
691                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
692                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
693         }
694         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
695         oa->o_dropped = cli->cl_lost_grant;
696         cli->cl_lost_grant = 0;
697         spin_unlock(&cli->cl_loi_list_lock);
698         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
699                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
700 }
701
702 void osc_update_next_shrink(struct client_obd *cli)
703 {
704         cli->cl_next_shrink_grant = ktime_get_seconds() +
705                                     cli->cl_grant_shrink_interval;
706
707         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
708                cli->cl_next_shrink_grant);
709 }
710
711 static void __osc_update_grant(struct client_obd *cli, u64 grant)
712 {
713         spin_lock(&cli->cl_loi_list_lock);
714         cli->cl_avail_grant += grant;
715         spin_unlock(&cli->cl_loi_list_lock);
716 }
717
718 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
719 {
720         if (body->oa.o_valid & OBD_MD_FLGRANT) {
721                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
722                 __osc_update_grant(cli, body->oa.o_grant);
723         }
724 }
725
726 /**
727  * grant thread data for shrinking space.
728  */
729 struct grant_thread_data {
730         struct list_head        gtd_clients;
731         struct mutex            gtd_mutex;
732         unsigned long           gtd_stopped:1;
733 };
734 static struct grant_thread_data client_gtd;
735
736 static int osc_shrink_grant_interpret(const struct lu_env *env,
737                                       struct ptlrpc_request *req,
738                                       void *args, int rc)
739 {
740         struct osc_grant_args *aa = args;
741         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
742         struct ost_body *body;
743
744         if (rc != 0) {
745                 __osc_update_grant(cli, aa->aa_oa->o_grant);
746                 GOTO(out, rc);
747         }
748
749         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750         LASSERT(body);
751         osc_update_grant(cli, body);
752 out:
753         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
754
755         return rc;
756 }
757
758 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
759 {
760         spin_lock(&cli->cl_loi_list_lock);
761         oa->o_grant = cli->cl_avail_grant / 4;
762         cli->cl_avail_grant -= oa->o_grant;
763         spin_unlock(&cli->cl_loi_list_lock);
764         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
765                 oa->o_valid |= OBD_MD_FLFLAGS;
766                 oa->o_flags = 0;
767         }
768         oa->o_flags |= OBD_FL_SHRINK_GRANT;
769         osc_update_next_shrink(cli);
770 }
771
772 /* Shrink the current grant, either from some large amount to enough for a
773  * full set of in-flight RPCs, or if we have already shrunk to that limit
774  * then to enough for a single RPC.  This avoids keeping more grant than
775  * needed, and avoids shrinking the grant piecemeal. */
776 static int osc_shrink_grant(struct client_obd *cli)
777 {
778         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
779                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
780
781         spin_lock(&cli->cl_loi_list_lock);
782         if (cli->cl_avail_grant <= target_bytes)
783                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
784         spin_unlock(&cli->cl_loi_list_lock);
785
786         return osc_shrink_grant_to_target(cli, target_bytes);
787 }
788
789 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
790 {
791         int                     rc = 0;
792         struct ost_body        *body;
793         ENTRY;
794
795         spin_lock(&cli->cl_loi_list_lock);
796         /* Don't shrink if we are already above or below the desired limit
797          * We don't want to shrink below a single RPC, as that will negatively
798          * impact block allocation and long-term performance. */
799         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
800                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
801
802         if (target_bytes >= cli->cl_avail_grant) {
803                 spin_unlock(&cli->cl_loi_list_lock);
804                 RETURN(0);
805         }
806         spin_unlock(&cli->cl_loi_list_lock);
807
808         OBD_ALLOC_PTR(body);
809         if (!body)
810                 RETURN(-ENOMEM);
811
812         osc_announce_cached(cli, &body->oa, 0);
813
814         spin_lock(&cli->cl_loi_list_lock);
815         if (target_bytes >= cli->cl_avail_grant) {
816                 /* available grant has changed since target calculation */
817                 spin_unlock(&cli->cl_loi_list_lock);
818                 GOTO(out_free, rc = 0);
819         }
820         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
821         cli->cl_avail_grant = target_bytes;
822         spin_unlock(&cli->cl_loi_list_lock);
823         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
824                 body->oa.o_valid |= OBD_MD_FLFLAGS;
825                 body->oa.o_flags = 0;
826         }
827         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
828         osc_update_next_shrink(cli);
829
830         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
831                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
832                                 sizeof(*body), body, NULL);
833         if (rc != 0)
834                 __osc_update_grant(cli, body->oa.o_grant);
835 out_free:
836         OBD_FREE_PTR(body);
837         RETURN(rc);
838 }
839
840 static int osc_should_shrink_grant(struct client_obd *client)
841 {
842         time64_t next_shrink = client->cl_next_shrink_grant;
843
844         if (client->cl_import == NULL)
845                 return 0;
846
847         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
848              OBD_CONNECT_GRANT_SHRINK) == 0)
849                 return 0;
850
851         if (ktime_get_seconds() >= next_shrink - 5) {
852                 /* Get the current RPC size directly, instead of going via:
853                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
854                  * Keep comment here so that it can be found by searching. */
855                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
856
857                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
858                     client->cl_avail_grant > brw_size)
859                         return 1;
860                 else
861                         osc_update_next_shrink(client);
862         }
863         return 0;
864 }
865
866 #define GRANT_SHRINK_RPC_BATCH  100
867
868 static struct delayed_work work;
869
870 static void osc_grant_work_handler(struct work_struct *data)
871 {
872         struct client_obd *cli;
873         int rpc_sent;
874         bool init_next_shrink = true;
875         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
876
877         rpc_sent = 0;
878         mutex_lock(&client_gtd.gtd_mutex);
879         list_for_each_entry(cli, &client_gtd.gtd_clients,
880                             cl_grant_chain) {
881                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
882                     osc_should_shrink_grant(cli)) {
883                         osc_shrink_grant(cli);
884                         rpc_sent++;
885                 }
886
887                 if (!init_next_shrink) {
888                         if (cli->cl_next_shrink_grant < next_shrink &&
889                             cli->cl_next_shrink_grant > ktime_get_seconds())
890                                 next_shrink = cli->cl_next_shrink_grant;
891                 } else {
892                         init_next_shrink = false;
893                         next_shrink = cli->cl_next_shrink_grant;
894                 }
895         }
896         mutex_unlock(&client_gtd.gtd_mutex);
897
898         if (client_gtd.gtd_stopped == 1)
899                 return;
900
901         if (next_shrink > ktime_get_seconds()) {
902                 time64_t delay = next_shrink - ktime_get_seconds();
903
904                 schedule_delayed_work(&work, cfs_time_seconds(delay));
905         } else {
906                 schedule_work(&work.work);
907         }
908 }
909
910 void osc_schedule_grant_work(void)
911 {
912         cancel_delayed_work_sync(&work);
913         schedule_work(&work.work);
914 }
915
916 /**
917  * Start grant thread for returing grant to server for idle clients.
918  */
919 static int osc_start_grant_work(void)
920 {
921         client_gtd.gtd_stopped = 0;
922         mutex_init(&client_gtd.gtd_mutex);
923         INIT_LIST_HEAD(&client_gtd.gtd_clients);
924
925         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
926         schedule_work(&work.work);
927
928         return 0;
929 }
930
931 static void osc_stop_grant_work(void)
932 {
933         client_gtd.gtd_stopped = 1;
934         cancel_delayed_work_sync(&work);
935 }
936
937 static void osc_add_grant_list(struct client_obd *client)
938 {
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 static void osc_del_grant_list(struct client_obd *client)
945 {
946         if (list_empty(&client->cl_grant_chain))
947                 return;
948
949         mutex_lock(&client_gtd.gtd_mutex);
950         list_del_init(&client->cl_grant_chain);
951         mutex_unlock(&client_gtd.gtd_mutex);
952 }
953
954 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
955 {
956         /*
957          * ocd_grant is the total grant amount we're expect to hold: if we've
958          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
959          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
960          * dirty.
961          *
962          * race is tolerable here: if we're evicted, but imp_state already
963          * left EVICTED state, then cl_dirty_pages must be 0 already.
964          */
965         spin_lock(&cli->cl_loi_list_lock);
966         cli->cl_avail_grant = ocd->ocd_grant;
967         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
968                 cli->cl_avail_grant -= cli->cl_reserved_grant;
969                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
970                         cli->cl_avail_grant -= cli->cl_dirty_grant;
971                 else
972                         cli->cl_avail_grant -=
973                                         cli->cl_dirty_pages << PAGE_SHIFT;
974         }
975
976         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
977                 u64 size;
978                 int chunk_mask;
979
980                 /* overhead for each extent insertion */
981                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
982                 /* determine the appropriate chunk size used by osc_extent. */
983                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
984                                           ocd->ocd_grant_blkbits);
985                 /* max_pages_per_rpc must be chunk aligned */
986                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
987                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
988                                              ~chunk_mask) & chunk_mask;
989                 /* determine maximum extent size, in #pages */
990                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
991                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
992                 if (cli->cl_max_extent_pages == 0)
993                         cli->cl_max_extent_pages = 1;
994         } else {
995                 cli->cl_grant_extent_tax = 0;
996                 cli->cl_chunkbits = PAGE_SHIFT;
997                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
998         }
999         spin_unlock(&cli->cl_loi_list_lock);
1000
1001         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1002                 "chunk bits: %d cl_max_extent_pages: %d\n",
1003                 cli_name(cli),
1004                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1005                 cli->cl_max_extent_pages);
1006
1007         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1008                 osc_add_grant_list(cli);
1009 }
1010 EXPORT_SYMBOL(osc_init_grant);
1011
1012 /* We assume that the reason this OSC got a short read is because it read
1013  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1014  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1015  * this stripe never got written at or beyond this stripe offset yet. */
1016 static void handle_short_read(int nob_read, size_t page_count,
1017                               struct brw_page **pga)
1018 {
1019         char *ptr;
1020         int i = 0;
1021
1022         /* skip bytes read OK */
1023         while (nob_read > 0) {
1024                 LASSERT (page_count > 0);
1025
1026                 if (pga[i]->count > nob_read) {
1027                         /* EOF inside this page */
1028                         ptr = kmap(pga[i]->pg) +
1029                                 (pga[i]->off & ~PAGE_MASK);
1030                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1031                         kunmap(pga[i]->pg);
1032                         page_count--;
1033                         i++;
1034                         break;
1035                 }
1036
1037                 nob_read -= pga[i]->count;
1038                 page_count--;
1039                 i++;
1040         }
1041
1042         /* zero remaining pages */
1043         while (page_count-- > 0) {
1044                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1045                 memset(ptr, 0, pga[i]->count);
1046                 kunmap(pga[i]->pg);
1047                 i++;
1048         }
1049 }
1050
1051 static int check_write_rcs(struct ptlrpc_request *req,
1052                            int requested_nob, int niocount,
1053                            size_t page_count, struct brw_page **pga)
1054 {
1055         int     i;
1056         __u32   *remote_rcs;
1057
1058         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1059                                                   sizeof(*remote_rcs) *
1060                                                   niocount);
1061         if (remote_rcs == NULL) {
1062                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1063                 return(-EPROTO);
1064         }
1065
1066         /* return error if any niobuf was in error */
1067         for (i = 0; i < niocount; i++) {
1068                 if ((int)remote_rcs[i] < 0) {
1069                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1070                                i, remote_rcs[i], req);
1071                         return remote_rcs[i];
1072                 }
1073
1074                 if (remote_rcs[i] != 0) {
1075                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1076                                 i, remote_rcs[i], req);
1077                         return(-EPROTO);
1078                 }
1079         }
1080         if (req->rq_bulk != NULL &&
1081             req->rq_bulk->bd_nob_transferred != requested_nob) {
1082                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1083                        req->rq_bulk->bd_nob_transferred, requested_nob);
1084                 return(-EPROTO);
1085         }
1086
1087         return (0);
1088 }
1089
1090 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1091 {
1092         if (p1->flag != p2->flag) {
1093                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1094                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1095                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1096
1097                 /* warn if we try to combine flags that we don't know to be
1098                  * safe to combine */
1099                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1100                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1101                               "report this at https://jira.whamcloud.com/\n",
1102                               p1->flag, p2->flag);
1103                 }
1104                 return 0;
1105         }
1106
1107         return (p1->off + p1->count == p2->off);
1108 }
1109
1110 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1111 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1112                                    size_t pg_count, struct brw_page **pga,
1113                                    int opc, obd_dif_csum_fn *fn,
1114                                    int sector_size,
1115                                    u32 *check_sum)
1116 {
1117         struct ahash_request *req;
1118         /* Used Adler as the default checksum type on top of DIF tags */
1119         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1120         struct page *__page;
1121         unsigned char *buffer;
1122         __u16 *guard_start;
1123         unsigned int bufsize;
1124         int guard_number;
1125         int used_number = 0;
1126         int used;
1127         u32 cksum;
1128         int rc = 0;
1129         int i = 0;
1130
1131         LASSERT(pg_count > 0);
1132
1133         __page = alloc_page(GFP_KERNEL);
1134         if (__page == NULL)
1135                 return -ENOMEM;
1136
1137         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1138         if (IS_ERR(req)) {
1139                 rc = PTR_ERR(req);
1140                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1141                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1142                 GOTO(out, rc);
1143         }
1144
1145         buffer = kmap(__page);
1146         guard_start = (__u16 *)buffer;
1147         guard_number = PAGE_SIZE / sizeof(*guard_start);
1148         while (nob > 0 && pg_count > 0) {
1149                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1150
1151                 /* corrupt the data before we compute the checksum, to
1152                  * simulate an OST->client data error */
1153                 if (unlikely(i == 0 && opc == OST_READ &&
1154                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1155                         unsigned char *ptr = kmap(pga[i]->pg);
1156                         int off = pga[i]->off & ~PAGE_MASK;
1157
1158                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1159                         kunmap(pga[i]->pg);
1160                 }
1161
1162                 /*
1163                  * The left guard number should be able to hold checksums of a
1164                  * whole page
1165                  */
1166                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1167                                                   pga[i]->off & ~PAGE_MASK,
1168                                                   count,
1169                                                   guard_start + used_number,
1170                                                   guard_number - used_number,
1171                                                   &used, sector_size,
1172                                                   fn);
1173                 if (rc)
1174                         break;
1175
1176                 used_number += used;
1177                 if (used_number == guard_number) {
1178                         cfs_crypto_hash_update_page(req, __page, 0,
1179                                 used_number * sizeof(*guard_start));
1180                         used_number = 0;
1181                 }
1182
1183                 nob -= pga[i]->count;
1184                 pg_count--;
1185                 i++;
1186         }
1187         kunmap(__page);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         if (used_number != 0)
1192                 cfs_crypto_hash_update_page(req, __page, 0,
1193                         used_number * sizeof(*guard_start));
1194
1195         bufsize = sizeof(cksum);
1196         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1197
1198         /* For sending we only compute the wrong checksum instead
1199          * of corrupting the data so it is still correct on a redo */
1200         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1201                 cksum++;
1202
1203         *check_sum = cksum;
1204 out:
1205         __free_page(__page);
1206         return rc;
1207 }
1208 #else /* !CONFIG_CRC_T10DIF */
1209 #define obd_dif_ip_fn NULL
1210 #define obd_dif_crc_fn NULL
1211 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1212         -EOPNOTSUPP
1213 #endif /* CONFIG_CRC_T10DIF */
1214
1215 static int osc_checksum_bulk(int nob, size_t pg_count,
1216                              struct brw_page **pga, int opc,
1217                              enum cksum_types cksum_type,
1218                              u32 *cksum)
1219 {
1220         int                             i = 0;
1221         struct ahash_request           *req;
1222         unsigned int                    bufsize;
1223         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1224
1225         LASSERT(pg_count > 0);
1226
1227         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1228         if (IS_ERR(req)) {
1229                 CERROR("Unable to initialize checksum hash %s\n",
1230                        cfs_crypto_hash_name(cfs_alg));
1231                 return PTR_ERR(req);
1232         }
1233
1234         while (nob > 0 && pg_count > 0) {
1235                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1236
1237                 /* corrupt the data before we compute the checksum, to
1238                  * simulate an OST->client data error */
1239                 if (i == 0 && opc == OST_READ &&
1240                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1241                         unsigned char *ptr = kmap(pga[i]->pg);
1242                         int off = pga[i]->off & ~PAGE_MASK;
1243
1244                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1245                         kunmap(pga[i]->pg);
1246                 }
1247                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1248                                             pga[i]->off & ~PAGE_MASK,
1249                                             count);
1250                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1251                                (int)(pga[i]->off & ~PAGE_MASK));
1252
1253                 nob -= pga[i]->count;
1254                 pg_count--;
1255                 i++;
1256         }
1257
1258         bufsize = sizeof(*cksum);
1259         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1260
1261         /* For sending we only compute the wrong checksum instead
1262          * of corrupting the data so it is still correct on a redo */
1263         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264                 (*cksum)++;
1265
1266         return 0;
1267 }
1268
1269 static int osc_checksum_bulk_rw(const char *obd_name,
1270                                 enum cksum_types cksum_type,
1271                                 int nob, size_t pg_count,
1272                                 struct brw_page **pga, int opc,
1273                                 u32 *check_sum)
1274 {
1275         obd_dif_csum_fn *fn = NULL;
1276         int sector_size = 0;
1277         int rc;
1278
1279         ENTRY;
1280         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1281
1282         if (fn)
1283                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1284                                              opc, fn, sector_size, check_sum);
1285         else
1286                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1287                                        check_sum);
1288
1289         RETURN(rc);
1290 }
1291
1292 static int
1293 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1294                      u32 page_count, struct brw_page **pga,
1295                      struct ptlrpc_request **reqp, int resend)
1296 {
1297         struct ptlrpc_request   *req;
1298         struct ptlrpc_bulk_desc *desc;
1299         struct ost_body         *body;
1300         struct obd_ioobj        *ioobj;
1301         struct niobuf_remote    *niobuf;
1302         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1303         struct osc_brw_async_args *aa;
1304         struct req_capsule      *pill;
1305         struct brw_page *pg_prev;
1306         void *short_io_buf;
1307         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1308
1309         ENTRY;
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1311                 RETURN(-ENOMEM); /* Recoverable */
1312         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1313                 RETURN(-EINVAL); /* Fatal */
1314
1315         if ((cmd & OBD_BRW_WRITE) != 0) {
1316                 opc = OST_WRITE;
1317                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1318                                                 osc_rq_pool,
1319                                                 &RQF_OST_BRW_WRITE);
1320         } else {
1321                 opc = OST_READ;
1322                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1323         }
1324         if (req == NULL)
1325                 RETURN(-ENOMEM);
1326
1327         for (niocount = i = 1; i < page_count; i++) {
1328                 if (!can_merge_pages(pga[i - 1], pga[i]))
1329                         niocount++;
1330         }
1331
1332         pill = &req->rq_pill;
1333         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1334                              sizeof(*ioobj));
1335         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1336                              niocount * sizeof(*niobuf));
1337
1338         for (i = 0; i < page_count; i++)
1339                 short_io_size += pga[i]->count;
1340
1341         /* Check if read/write is small enough to be a short io. */
1342         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1343             !imp_connect_shortio(cli->cl_import))
1344                 short_io_size = 0;
1345
1346         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1347                              opc == OST_READ ? 0 : short_io_size);
1348         if (opc == OST_READ)
1349                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1350                                      short_io_size);
1351
1352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1353         if (rc) {
1354                 ptlrpc_request_free(req);
1355                 RETURN(rc);
1356         }
1357         osc_set_io_portal(req);
1358
1359         ptlrpc_at_set_req_timeout(req);
1360         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1361          * retry logic */
1362         req->rq_no_retry_einprogress = 1;
1363
1364         if (short_io_size != 0) {
1365                 desc = NULL;
1366                 short_io_buf = NULL;
1367                 goto no_bulk;
1368         }
1369
1370         desc = ptlrpc_prep_bulk_imp(req, page_count,
1371                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1372                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1373                         PTLRPC_BULK_PUT_SINK) |
1374                         PTLRPC_BULK_BUF_KIOV,
1375                 OST_BULK_PORTAL,
1376                 &ptlrpc_bulk_kiov_pin_ops);
1377
1378         if (desc == NULL)
1379                 GOTO(out, rc = -ENOMEM);
1380         /* NB request now owns desc and will free it when it gets freed */
1381 no_bulk:
1382         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1383         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1384         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1385         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1386
1387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1388
1389         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1390          * and from_kgid(), because they are asynchronous. Fortunately, variable
1391          * oa contains valid o_uid and o_gid in these two operations.
1392          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1393          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1394          * other process logic */
1395         body->oa.o_uid = oa->o_uid;
1396         body->oa.o_gid = oa->o_gid;
1397
1398         obdo_to_ioobj(oa, ioobj);
1399         ioobj->ioo_bufcnt = niocount;
1400         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1401          * that might be send for this request.  The actual number is decided
1402          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1403          * "max - 1" for old client compatibility sending "0", and also so the
1404          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1405         if (desc != NULL)
1406                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1407         else /* short io */
1408                 ioobj_max_brw_set(ioobj, 0);
1409
1410         if (short_io_size != 0) {
1411                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1412                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1413                         body->oa.o_flags = 0;
1414                 }
1415                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1416                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1417                        short_io_size);
1418                 if (opc == OST_WRITE) {
1419                         short_io_buf = req_capsule_client_get(pill,
1420                                                               &RMF_SHORT_IO);
1421                         LASSERT(short_io_buf != NULL);
1422                 }
1423         }
1424
1425         LASSERT(page_count > 0);
1426         pg_prev = pga[0];
1427         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1428                 struct brw_page *pg = pga[i];
1429                 int poff = pg->off & ~PAGE_MASK;
1430
1431                 LASSERT(pg->count > 0);
1432                 /* make sure there is no gap in the middle of page array */
1433                 LASSERTF(page_count == 1 ||
1434                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1435                           ergo(i > 0 && i < page_count - 1,
1436                                poff == 0 && pg->count == PAGE_SIZE)   &&
1437                           ergo(i == page_count - 1, poff == 0)),
1438                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1439                          i, page_count, pg, pg->off, pg->count);
1440                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1441                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1442                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1443                          i, page_count,
1444                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1445                          pg_prev->pg, page_private(pg_prev->pg),
1446                          pg_prev->pg->index, pg_prev->off);
1447                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1448                         (pg->flag & OBD_BRW_SRVLOCK));
1449                 if (short_io_size != 0 && opc == OST_WRITE) {
1450                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1451
1452                         LASSERT(short_io_size >= requested_nob + pg->count);
1453                         memcpy(short_io_buf + requested_nob,
1454                                ptr + poff,
1455                                pg->count);
1456                         ll_kunmap_atomic(ptr, KM_USER0);
1457                 } else if (short_io_size == 0) {
1458                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1459                                                          pg->count);
1460                 }
1461                 requested_nob += pg->count;
1462
1463                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1464                         niobuf--;
1465                         niobuf->rnb_len += pg->count;
1466                 } else {
1467                         niobuf->rnb_offset = pg->off;
1468                         niobuf->rnb_len    = pg->count;
1469                         niobuf->rnb_flags  = pg->flag;
1470                 }
1471                 pg_prev = pg;
1472         }
1473
1474         LASSERTF((void *)(niobuf - niocount) ==
1475                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1476                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1477                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1478
1479         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1480         if (resend) {
1481                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1482                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1483                         body->oa.o_flags = 0;
1484                 }
1485                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1486         }
1487
1488         if (osc_should_shrink_grant(cli))
1489                 osc_shrink_grant_local(cli, &body->oa);
1490
1491         /* size[REQ_REC_OFF] still sizeof (*body) */
1492         if (opc == OST_WRITE) {
1493                 if (cli->cl_checksum &&
1494                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1495                         /* store cl_cksum_type in a local variable since
1496                          * it can be changed via lprocfs */
1497                         enum cksum_types cksum_type = cli->cl_cksum_type;
1498
1499                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1500                                 body->oa.o_flags = 0;
1501
1502                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1503                                                                 cksum_type);
1504                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1505
1506                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1507                                                   requested_nob, page_count,
1508                                                   pga, OST_WRITE,
1509                                                   &body->oa.o_cksum);
1510                         if (rc < 0) {
1511                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1512                                        rc);
1513                                 GOTO(out, rc);
1514                         }
1515                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1516                                body->oa.o_cksum);
1517
1518                         /* save this in 'oa', too, for later checking */
1519                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1521                                                            cksum_type);
1522                 } else {
1523                         /* clear out the checksum flag, in case this is a
1524                          * resend but cl_checksum is no longer set. b=11238 */
1525                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1526                 }
1527                 oa->o_cksum = body->oa.o_cksum;
1528                 /* 1 RC per niobuf */
1529                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1530                                      sizeof(__u32) * niocount);
1531         } else {
1532                 if (cli->cl_checksum &&
1533                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1534                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1535                                 body->oa.o_flags = 0;
1536                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1537                                 cli->cl_cksum_type);
1538                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1539                 }
1540
1541                 /* Client cksum has been already copied to wire obdo in previous
1542                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1543                  * resent due to cksum error, this will allow Server to
1544                  * check+dump pages on its side */
1545         }
1546         ptlrpc_request_set_replen(req);
1547
1548         aa = ptlrpc_req_async_args(aa, req);
1549         aa->aa_oa = oa;
1550         aa->aa_requested_nob = requested_nob;
1551         aa->aa_nio_count = niocount;
1552         aa->aa_page_count = page_count;
1553         aa->aa_resends = 0;
1554         aa->aa_ppga = pga;
1555         aa->aa_cli = cli;
1556         INIT_LIST_HEAD(&aa->aa_oaps);
1557
1558         *reqp = req;
1559         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1560         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1561                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1562                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1563         RETURN(0);
1564
1565  out:
1566         ptlrpc_req_finished(req);
1567         RETURN(rc);
1568 }
1569
1570 char dbgcksum_file_name[PATH_MAX];
1571
1572 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1573                                 struct brw_page **pga, __u32 server_cksum,
1574                                 __u32 client_cksum)
1575 {
1576         struct file *filp;
1577         int rc, i;
1578         unsigned int len;
1579         char *buf;
1580
1581         /* will only keep dump of pages on first error for the same range in
1582          * file/fid, not during the resends/retries. */
1583         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1584                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1585                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1586                   libcfs_debug_file_path_arr :
1587                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1591                  pga[0]->off,
1592                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1593                  client_cksum, server_cksum);
1594         filp = filp_open(dbgcksum_file_name,
1595                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1596         if (IS_ERR(filp)) {
1597                 rc = PTR_ERR(filp);
1598                 if (rc == -EEXIST)
1599                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1600                                "checksum error: rc = %d\n", dbgcksum_file_name,
1601                                rc);
1602                 else
1603                         CERROR("%s: can't open to dump pages with checksum "
1604                                "error: rc = %d\n", dbgcksum_file_name, rc);
1605                 return;
1606         }
1607
1608         for (i = 0; i < page_count; i++) {
1609                 len = pga[i]->count;
1610                 buf = kmap(pga[i]->pg);
1611                 while (len != 0) {
1612                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1613                         if (rc < 0) {
1614                                 CERROR("%s: wanted to write %u but got %d "
1615                                        "error\n", dbgcksum_file_name, len, rc);
1616                                 break;
1617                         }
1618                         len -= rc;
1619                         buf += rc;
1620                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1621                                dbgcksum_file_name, rc);
1622                 }
1623                 kunmap(pga[i]->pg);
1624         }
1625
1626         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1627         if (rc)
1628                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1629         filp_close(filp, NULL);
1630         return;
1631 }
1632
1633 static int
1634 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1635                      __u32 client_cksum, __u32 server_cksum,
1636                      struct osc_brw_async_args *aa)
1637 {
1638         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1639         enum cksum_types cksum_type;
1640         obd_dif_csum_fn *fn = NULL;
1641         int sector_size = 0;
1642         __u32 new_cksum;
1643         char *msg;
1644         int rc;
1645
1646         if (server_cksum == client_cksum) {
1647                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1648                 return 0;
1649         }
1650
1651         if (aa->aa_cli->cl_checksum_dump)
1652                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1653                                     server_cksum, client_cksum);
1654
1655         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1656                                            oa->o_flags : 0);
1657
1658         switch (cksum_type) {
1659         case OBD_CKSUM_T10IP512:
1660                 fn = obd_dif_ip_fn;
1661                 sector_size = 512;
1662                 break;
1663         case OBD_CKSUM_T10IP4K:
1664                 fn = obd_dif_ip_fn;
1665                 sector_size = 4096;
1666                 break;
1667         case OBD_CKSUM_T10CRC512:
1668                 fn = obd_dif_crc_fn;
1669                 sector_size = 512;
1670                 break;
1671         case OBD_CKSUM_T10CRC4K:
1672                 fn = obd_dif_crc_fn;
1673                 sector_size = 4096;
1674                 break;
1675         default:
1676                 break;
1677         }
1678
1679         if (fn)
1680                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1681                                              aa->aa_page_count, aa->aa_ppga,
1682                                              OST_WRITE, fn, sector_size,
1683                                              &new_cksum);
1684         else
1685                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1686                                        aa->aa_ppga, OST_WRITE, cksum_type,
1687                                        &new_cksum);
1688
1689         if (rc < 0)
1690                 msg = "failed to calculate the client write checksum";
1691         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1692                 msg = "the server did not use the checksum type specified in "
1693                       "the original request - likely a protocol problem";
1694         else if (new_cksum == server_cksum)
1695                 msg = "changed on the client after we checksummed it - "
1696                       "likely false positive due to mmap IO (bug 11742)";
1697         else if (new_cksum == client_cksum)
1698                 msg = "changed in transit before arrival at OST";
1699         else
1700                 msg = "changed in transit AND doesn't match the original - "
1701                       "likely false positive due to mmap IO (bug 11742)";
1702
1703         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1704                            DFID " object "DOSTID" extent [%llu-%llu], original "
1705                            "client csum %x (type %x), server csum %x (type %x),"
1706                            " client csum now %x\n",
1707                            obd_name, msg, libcfs_nid2str(peer->nid),
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1711                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1712                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1713                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1714                            client_cksum,
1715                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1716                            server_cksum, cksum_type, new_cksum);
1717         return 1;
1718 }
1719
1720 /* Note rc enters this function as number of bytes transferred */
1721 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1722 {
1723         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1724         struct client_obd *cli = aa->aa_cli;
1725         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1726         const struct lnet_process_id *peer =
1727                 &req->rq_import->imp_connection->c_peer;
1728         struct ost_body *body;
1729         u32 client_cksum = 0;
1730
1731         ENTRY;
1732
1733         if (rc < 0 && rc != -EDQUOT) {
1734                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1735                 RETURN(rc);
1736         }
1737
1738         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1739         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1740         if (body == NULL) {
1741                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1742                 RETURN(-EPROTO);
1743         }
1744
1745         /* set/clear over quota flag for a uid/gid/projid */
1746         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1747             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1748                 unsigned qid[LL_MAXQUOTAS] = {
1749                                          body->oa.o_uid, body->oa.o_gid,
1750                                          body->oa.o_projid };
1751                 CDEBUG(D_QUOTA,
1752                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1753                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1754                        body->oa.o_valid, body->oa.o_flags);
1755                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1756                                        body->oa.o_flags);
1757         }
1758
1759         osc_update_grant(cli, body);
1760
1761         if (rc < 0)
1762                 RETURN(rc);
1763
1764         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1765                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1766
1767         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1768                 if (rc > 0) {
1769                         CERROR("%s: unexpected positive size %d\n",
1770                                obd_name, rc);
1771                         RETURN(-EPROTO);
1772                 }
1773
1774                 if (req->rq_bulk != NULL &&
1775                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1776                         RETURN(-EAGAIN);
1777
1778                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1779                     check_write_checksum(&body->oa, peer, client_cksum,
1780                                          body->oa.o_cksum, aa))
1781                         RETURN(-EAGAIN);
1782
1783                 rc = check_write_rcs(req, aa->aa_requested_nob,
1784                                      aa->aa_nio_count, aa->aa_page_count,
1785                                      aa->aa_ppga);
1786                 GOTO(out, rc);
1787         }
1788
1789         /* The rest of this function executes only for OST_READs */
1790
1791         if (req->rq_bulk == NULL) {
1792                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1793                                           RCL_SERVER);
1794                 LASSERT(rc == req->rq_status);
1795         } else {
1796                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1797                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1798         }
1799         if (rc < 0)
1800                 GOTO(out, rc = -EAGAIN);
1801
1802         if (rc > aa->aa_requested_nob) {
1803                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1804                        rc, aa->aa_requested_nob);
1805                 RETURN(-EPROTO);
1806         }
1807
1808         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1809                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1810                        rc, req->rq_bulk->bd_nob_transferred);
1811                 RETURN(-EPROTO);
1812         }
1813
1814         if (req->rq_bulk == NULL) {
1815                 /* short io */
1816                 int nob, pg_count, i = 0;
1817                 unsigned char *buf;
1818
1819                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1820                 pg_count = aa->aa_page_count;
1821                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1822                                                    rc);
1823                 nob = rc;
1824                 while (nob > 0 && pg_count > 0) {
1825                         unsigned char *ptr;
1826                         int count = aa->aa_ppga[i]->count > nob ?
1827                                     nob : aa->aa_ppga[i]->count;
1828
1829                         CDEBUG(D_CACHE, "page %p count %d\n",
1830                                aa->aa_ppga[i]->pg, count);
1831                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1832                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1833                                count);
1834                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1835
1836                         buf += count;
1837                         nob -= count;
1838                         i++;
1839                         pg_count--;
1840                 }
1841         }
1842
1843         if (rc < aa->aa_requested_nob)
1844                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1845
1846         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1847                 static int cksum_counter;
1848                 u32        server_cksum = body->oa.o_cksum;
1849                 char      *via = "";
1850                 char      *router = "";
1851                 enum cksum_types cksum_type;
1852                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1853                         body->oa.o_flags : 0;
1854
1855                 cksum_type = obd_cksum_type_unpack(o_flags);
1856                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1857                                           aa->aa_page_count, aa->aa_ppga,
1858                                           OST_READ, &client_cksum);
1859                 if (rc < 0)
1860                         GOTO(out, rc);
1861
1862                 if (req->rq_bulk != NULL &&
1863                     peer->nid != req->rq_bulk->bd_sender) {
1864                         via = " via ";
1865                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1866                 }
1867
1868                 if (server_cksum != client_cksum) {
1869                         struct ost_body *clbody;
1870                         u32 page_count = aa->aa_page_count;
1871
1872                         clbody = req_capsule_client_get(&req->rq_pill,
1873                                                         &RMF_OST_BODY);
1874                         if (cli->cl_checksum_dump)
1875                                 dump_all_bulk_pages(&clbody->oa, page_count,
1876                                                     aa->aa_ppga, server_cksum,
1877                                                     client_cksum);
1878
1879                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1880                                            "%s%s%s inode "DFID" object "DOSTID
1881                                            " extent [%llu-%llu], client %x, "
1882                                            "server %x, cksum_type %x\n",
1883                                            obd_name,
1884                                            libcfs_nid2str(peer->nid),
1885                                            via, router,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_seq : 0ULL,
1888                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1889                                                 clbody->oa.o_parent_oid : 0,
1890                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1891                                                 clbody->oa.o_parent_ver : 0,
1892                                            POSTID(&body->oa.o_oi),
1893                                            aa->aa_ppga[0]->off,
1894                                            aa->aa_ppga[page_count-1]->off +
1895                                            aa->aa_ppga[page_count-1]->count - 1,
1896                                            client_cksum, server_cksum,
1897                                            cksum_type);
1898                         cksum_counter = 0;
1899                         aa->aa_oa->o_cksum = client_cksum;
1900                         rc = -EAGAIN;
1901                 } else {
1902                         cksum_counter++;
1903                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1904                         rc = 0;
1905                 }
1906         } else if (unlikely(client_cksum)) {
1907                 static int cksum_missed;
1908
1909                 cksum_missed++;
1910                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1911                         CERROR("%s: checksum %u requested from %s but not sent\n",
1912                                obd_name, cksum_missed,
1913                                libcfs_nid2str(peer->nid));
1914         } else {
1915                 rc = 0;
1916         }
1917 out:
1918         if (rc >= 0)
1919                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1920                                      aa->aa_oa, &body->oa);
1921
1922         RETURN(rc);
1923 }
1924
1925 static int osc_brw_redo_request(struct ptlrpc_request *request,
1926                                 struct osc_brw_async_args *aa, int rc)
1927 {
1928         struct ptlrpc_request *new_req;
1929         struct osc_brw_async_args *new_aa;
1930         struct osc_async_page *oap;
1931         ENTRY;
1932
1933         /* The below message is checked in replay-ost-single.sh test_8ae*/
1934         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1935                   "redo for recoverable error %d", rc);
1936
1937         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1938                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1939                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1940                                   aa->aa_ppga, &new_req, 1);
1941         if (rc)
1942                 RETURN(rc);
1943
1944         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1945                 if (oap->oap_request != NULL) {
1946                         LASSERTF(request == oap->oap_request,
1947                                  "request %p != oap_request %p\n",
1948                                  request, oap->oap_request);
1949                         if (oap->oap_interrupted) {
1950                                 ptlrpc_req_finished(new_req);
1951                                 RETURN(-EINTR);
1952                         }
1953                 }
1954         }
1955         /*
1956          * New request takes over pga and oaps from old request.
1957          * Note that copying a list_head doesn't work, need to move it...
1958          */
1959         aa->aa_resends++;
1960         new_req->rq_interpret_reply = request->rq_interpret_reply;
1961         new_req->rq_async_args = request->rq_async_args;
1962         new_req->rq_commit_cb = request->rq_commit_cb;
1963         /* cap resend delay to the current request timeout, this is similar to
1964          * what ptlrpc does (see after_reply()) */
1965         if (aa->aa_resends > new_req->rq_timeout)
1966                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1967         else
1968                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1969         new_req->rq_generation_set = 1;
1970         new_req->rq_import_generation = request->rq_import_generation;
1971
1972         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1973
1974         INIT_LIST_HEAD(&new_aa->aa_oaps);
1975         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1976         INIT_LIST_HEAD(&new_aa->aa_exts);
1977         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1978         new_aa->aa_resends = aa->aa_resends;
1979
1980         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1981                 if (oap->oap_request) {
1982                         ptlrpc_req_finished(oap->oap_request);
1983                         oap->oap_request = ptlrpc_request_addref(new_req);
1984                 }
1985         }
1986
1987         /* XXX: This code will run into problem if we're going to support
1988          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1989          * and wait for all of them to be finished. We should inherit request
1990          * set from old request. */
1991         ptlrpcd_add_req(new_req);
1992
1993         DEBUG_REQ(D_INFO, new_req, "new request");
1994         RETURN(0);
1995 }
1996
1997 /*
1998  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1999  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2000  * fine for our small page arrays and doesn't require allocation.  its an
2001  * insertion sort that swaps elements that are strides apart, shrinking the
2002  * stride down until its '1' and the array is sorted.
2003  */
2004 static void sort_brw_pages(struct brw_page **array, int num)
2005 {
2006         int stride, i, j;
2007         struct brw_page *tmp;
2008
2009         if (num == 1)
2010                 return;
2011         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2012                 ;
2013
2014         do {
2015                 stride /= 3;
2016                 for (i = stride ; i < num ; i++) {
2017                         tmp = array[i];
2018                         j = i;
2019                         while (j >= stride && array[j - stride]->off > tmp->off) {
2020                                 array[j] = array[j - stride];
2021                                 j -= stride;
2022                         }
2023                         array[j] = tmp;
2024                 }
2025         } while (stride > 1);
2026 }
2027
2028 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2029 {
2030         LASSERT(ppga != NULL);
2031         OBD_FREE(ppga, sizeof(*ppga) * count);
2032 }
2033
2034 static int brw_interpret(const struct lu_env *env,
2035                          struct ptlrpc_request *req, void *args, int rc)
2036 {
2037         struct osc_brw_async_args *aa = args;
2038         struct osc_extent *ext;
2039         struct osc_extent *tmp;
2040         struct client_obd *cli = aa->aa_cli;
2041         unsigned long transferred = 0;
2042
2043         ENTRY;
2044
2045         rc = osc_brw_fini_request(req, rc);
2046         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2047         /*
2048          * When server returns -EINPROGRESS, client should always retry
2049          * regardless of the number of times the bulk was resent already.
2050          */
2051         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2052                 if (req->rq_import_generation !=
2053                     req->rq_import->imp_generation) {
2054                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2055                                ""DOSTID", rc = %d.\n",
2056                                req->rq_import->imp_obd->obd_name,
2057                                POSTID(&aa->aa_oa->o_oi), rc);
2058                 } else if (rc == -EINPROGRESS ||
2059                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2060                         rc = osc_brw_redo_request(req, aa, rc);
2061                 } else {
2062                         CERROR("%s: too many resent retries for object: "
2063                                "%llu:%llu, rc = %d.\n",
2064                                req->rq_import->imp_obd->obd_name,
2065                                POSTID(&aa->aa_oa->o_oi), rc);
2066                 }
2067
2068                 if (rc == 0)
2069                         RETURN(0);
2070                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2071                         rc = -EIO;
2072         }
2073
2074         if (rc == 0) {
2075                 struct obdo *oa = aa->aa_oa;
2076                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2077                 unsigned long valid = 0;
2078                 struct cl_object *obj;
2079                 struct osc_async_page *last;
2080
2081                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2082                 obj = osc2cl(last->oap_obj);
2083
2084                 cl_object_attr_lock(obj);
2085                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2086                         attr->cat_blocks = oa->o_blocks;
2087                         valid |= CAT_BLOCKS;
2088                 }
2089                 if (oa->o_valid & OBD_MD_FLMTIME) {
2090                         attr->cat_mtime = oa->o_mtime;
2091                         valid |= CAT_MTIME;
2092                 }
2093                 if (oa->o_valid & OBD_MD_FLATIME) {
2094                         attr->cat_atime = oa->o_atime;
2095                         valid |= CAT_ATIME;
2096                 }
2097                 if (oa->o_valid & OBD_MD_FLCTIME) {
2098                         attr->cat_ctime = oa->o_ctime;
2099                         valid |= CAT_CTIME;
2100                 }
2101
2102                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2103                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2104                         loff_t last_off = last->oap_count + last->oap_obj_off +
2105                                 last->oap_page_off;
2106
2107                         /* Change file size if this is an out of quota or
2108                          * direct IO write and it extends the file size */
2109                         if (loi->loi_lvb.lvb_size < last_off) {
2110                                 attr->cat_size = last_off;
2111                                 valid |= CAT_SIZE;
2112                         }
2113                         /* Extend KMS if it's not a lockless write */
2114                         if (loi->loi_kms < last_off &&
2115                             oap2osc_page(last)->ops_srvlock == 0) {
2116                                 attr->cat_kms = last_off;
2117                                 valid |= CAT_KMS;
2118                         }
2119                 }
2120
2121                 if (valid != 0)
2122                         cl_object_attr_update(env, obj, attr, valid);
2123                 cl_object_attr_unlock(obj);
2124         }
2125         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2126
2127         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2128                 osc_inc_unstable_pages(req);
2129
2130         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2131                 list_del_init(&ext->oe_link);
2132                 osc_extent_finish(env, ext, 1,
2133                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2134         }
2135         LASSERT(list_empty(&aa->aa_exts));
2136         LASSERT(list_empty(&aa->aa_oaps));
2137
2138         transferred = (req->rq_bulk == NULL ? /* short io */
2139                        aa->aa_requested_nob :
2140                        req->rq_bulk->bd_nob_transferred);
2141
2142         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2143         ptlrpc_lprocfs_brw(req, transferred);
2144
2145         spin_lock(&cli->cl_loi_list_lock);
2146         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2147          * is called so we know whether to go to sync BRWs or wait for more
2148          * RPCs to complete */
2149         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2150                 cli->cl_w_in_flight--;
2151         else
2152                 cli->cl_r_in_flight--;
2153         osc_wake_cache_waiters(cli);
2154         spin_unlock(&cli->cl_loi_list_lock);
2155
2156         osc_io_unplug(env, cli, NULL);
2157         RETURN(rc);
2158 }
2159
2160 static void brw_commit(struct ptlrpc_request *req)
2161 {
2162         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2163          * this called via the rq_commit_cb, I need to ensure
2164          * osc_dec_unstable_pages is still called. Otherwise unstable
2165          * pages may be leaked. */
2166         spin_lock(&req->rq_lock);
2167         if (likely(req->rq_unstable)) {
2168                 req->rq_unstable = 0;
2169                 spin_unlock(&req->rq_lock);
2170
2171                 osc_dec_unstable_pages(req);
2172         } else {
2173                 req->rq_committed = 1;
2174                 spin_unlock(&req->rq_lock);
2175         }
2176 }
2177
2178 /**
2179  * Build an RPC by the list of extent @ext_list. The caller must ensure
2180  * that the total pages in this list are NOT over max pages per RPC.
2181  * Extents in the list must be in OES_RPC state.
2182  */
2183 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2184                   struct list_head *ext_list, int cmd)
2185 {
2186         struct ptlrpc_request           *req = NULL;
2187         struct osc_extent               *ext;
2188         struct brw_page                 **pga = NULL;
2189         struct osc_brw_async_args       *aa = NULL;
2190         struct obdo                     *oa = NULL;
2191         struct osc_async_page           *oap;
2192         struct osc_object               *obj = NULL;
2193         struct cl_req_attr              *crattr = NULL;
2194         loff_t                          starting_offset = OBD_OBJECT_EOF;
2195         loff_t                          ending_offset = 0;
2196         int                             mpflag = 0;
2197         int                             mem_tight = 0;
2198         int                             page_count = 0;
2199         bool                            soft_sync = false;
2200         bool                            interrupted = false;
2201         bool                            ndelay = false;
2202         int                             i;
2203         int                             grant = 0;
2204         int                             rc;
2205         __u32                           layout_version = 0;
2206         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2207         struct ost_body                 *body;
2208         ENTRY;
2209         LASSERT(!list_empty(ext_list));
2210
2211         /* add pages into rpc_list to build BRW rpc */
2212         list_for_each_entry(ext, ext_list, oe_link) {
2213                 LASSERT(ext->oe_state == OES_RPC);
2214                 mem_tight |= ext->oe_memalloc;
2215                 grant += ext->oe_grants;
2216                 page_count += ext->oe_nr_pages;
2217                 layout_version = MAX(layout_version, ext->oe_layout_version);
2218                 if (obj == NULL)
2219                         obj = ext->oe_obj;
2220         }
2221
2222         soft_sync = osc_over_unstable_soft_limit(cli);
2223         if (mem_tight)
2224                 mpflag = cfs_memory_pressure_get_and_set();
2225
2226         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2227         if (pga == NULL)
2228                 GOTO(out, rc = -ENOMEM);
2229
2230         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2231         if (oa == NULL)
2232                 GOTO(out, rc = -ENOMEM);
2233
2234         i = 0;
2235         list_for_each_entry(ext, ext_list, oe_link) {
2236                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2237                         if (mem_tight)
2238                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2239                         if (soft_sync)
2240                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2241                         pga[i] = &oap->oap_brw_page;
2242                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2243                         i++;
2244
2245                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2246                         if (starting_offset == OBD_OBJECT_EOF ||
2247                             starting_offset > oap->oap_obj_off)
2248                                 starting_offset = oap->oap_obj_off;
2249                         else
2250                                 LASSERT(oap->oap_page_off == 0);
2251                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2252                                 ending_offset = oap->oap_obj_off +
2253                                                 oap->oap_count;
2254                         else
2255                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2256                                         PAGE_SIZE);
2257                         if (oap->oap_interrupted)
2258                                 interrupted = true;
2259                 }
2260                 if (ext->oe_ndelay)
2261                         ndelay = true;
2262         }
2263
2264         /* first page in the list */
2265         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2266
2267         crattr = &osc_env_info(env)->oti_req_attr;
2268         memset(crattr, 0, sizeof(*crattr));
2269         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2270         crattr->cra_flags = ~0ULL;
2271         crattr->cra_page = oap2cl_page(oap);
2272         crattr->cra_oa = oa;
2273         cl_req_attr_set(env, osc2cl(obj), crattr);
2274
2275         if (cmd == OBD_BRW_WRITE) {
2276                 oa->o_grant_used = grant;
2277                 if (layout_version > 0) {
2278                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2279                                PFID(&oa->o_oi.oi_fid), layout_version);
2280
2281                         oa->o_layout_version = layout_version;
2282                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2283                 }
2284         }
2285
2286         sort_brw_pages(pga, page_count);
2287         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2288         if (rc != 0) {
2289                 CERROR("prep_req failed: %d\n", rc);
2290                 GOTO(out, rc);
2291         }
2292
2293         req->rq_commit_cb = brw_commit;
2294         req->rq_interpret_reply = brw_interpret;
2295         req->rq_memalloc = mem_tight != 0;
2296         oap->oap_request = ptlrpc_request_addref(req);
2297         if (interrupted && !req->rq_intr)
2298                 ptlrpc_mark_interrupted(req);
2299         if (ndelay) {
2300                 req->rq_no_resend = req->rq_no_delay = 1;
2301                 /* probably set a shorter timeout value.
2302                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2303                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2304         }
2305
2306         /* Need to update the timestamps after the request is built in case
2307          * we race with setattr (locally or in queue at OST).  If OST gets
2308          * later setattr before earlier BRW (as determined by the request xid),
2309          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2310          * way to do this in a single call.  bug 10150 */
2311         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2312         crattr->cra_oa = &body->oa;
2313         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2314         cl_req_attr_set(env, osc2cl(obj), crattr);
2315         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2316
2317         aa = ptlrpc_req_async_args(aa, req);
2318         INIT_LIST_HEAD(&aa->aa_oaps);
2319         list_splice_init(&rpc_list, &aa->aa_oaps);
2320         INIT_LIST_HEAD(&aa->aa_exts);
2321         list_splice_init(ext_list, &aa->aa_exts);
2322
2323         spin_lock(&cli->cl_loi_list_lock);
2324         starting_offset >>= PAGE_SHIFT;
2325         if (cmd == OBD_BRW_READ) {
2326                 cli->cl_r_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2330                                       starting_offset + 1);
2331         } else {
2332                 cli->cl_w_in_flight++;
2333                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2334                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2335                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2336                                       starting_offset + 1);
2337         }
2338         spin_unlock(&cli->cl_loi_list_lock);
2339
2340         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2341                   page_count, aa, cli->cl_r_in_flight,
2342                   cli->cl_w_in_flight);
2343         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2344
2345         ptlrpcd_add_req(req);
2346         rc = 0;
2347         EXIT;
2348
2349 out:
2350         if (mem_tight != 0)
2351                 cfs_memory_pressure_restore(mpflag);
2352
2353         if (rc != 0) {
2354                 LASSERT(req == NULL);
2355
2356                 if (oa)
2357                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2358                 if (pga)
2359                         OBD_FREE(pga, sizeof(*pga) * page_count);
2360                 /* this should happen rarely and is pretty bad, it makes the
2361                  * pending list not follow the dirty order */
2362                 while (!list_empty(ext_list)) {
2363                         ext = list_entry(ext_list->next, struct osc_extent,
2364                                          oe_link);
2365                         list_del_init(&ext->oe_link);
2366                         osc_extent_finish(env, ext, 0, rc);
2367                 }
2368         }
2369         RETURN(rc);
2370 }
2371
2372 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2373 {
2374         int set = 0;
2375
2376         LASSERT(lock != NULL);
2377
2378         lock_res_and_lock(lock);
2379
2380         if (lock->l_ast_data == NULL)
2381                 lock->l_ast_data = data;
2382         if (lock->l_ast_data == data)
2383                 set = 1;
2384
2385         unlock_res_and_lock(lock);
2386
2387         return set;
2388 }
2389
2390 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2391                      void *cookie, struct lustre_handle *lockh,
2392                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2393                      int errcode)
2394 {
2395         bool intent = *flags & LDLM_FL_HAS_INTENT;
2396         int rc;
2397         ENTRY;
2398
2399         /* The request was created before ldlm_cli_enqueue call. */
2400         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2401                 struct ldlm_reply *rep;
2402
2403                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2404                 LASSERT(rep != NULL);
2405
2406                 rep->lock_policy_res1 =
2407                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2408                 if (rep->lock_policy_res1)
2409                         errcode = rep->lock_policy_res1;
2410                 if (!speculative)
2411                         *flags |= LDLM_FL_LVB_READY;
2412         } else if (errcode == ELDLM_OK) {
2413                 *flags |= LDLM_FL_LVB_READY;
2414         }
2415
2416         /* Call the update callback. */
2417         rc = (*upcall)(cookie, lockh, errcode);
2418
2419         /* release the reference taken in ldlm_cli_enqueue() */
2420         if (errcode == ELDLM_LOCK_MATCHED)
2421                 errcode = ELDLM_OK;
2422         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2423                 ldlm_lock_decref(lockh, mode);
2424
2425         RETURN(rc);
2426 }
2427
2428 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2429                           void *args, int rc)
2430 {
2431         struct osc_enqueue_args *aa = args;
2432         struct ldlm_lock *lock;
2433         struct lustre_handle *lockh = &aa->oa_lockh;
2434         enum ldlm_mode mode = aa->oa_mode;
2435         struct ost_lvb *lvb = aa->oa_lvb;
2436         __u32 lvb_len = sizeof(*lvb);
2437         __u64 flags = 0;
2438
2439         ENTRY;
2440
2441         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2442          * be valid. */
2443         lock = ldlm_handle2lock(lockh);
2444         LASSERTF(lock != NULL,
2445                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2446                  lockh->cookie, req, aa);
2447
2448         /* Take an additional reference so that a blocking AST that
2449          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2450          * to arrive after an upcall has been executed by
2451          * osc_enqueue_fini(). */
2452         ldlm_lock_addref(lockh, mode);
2453
2454         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2456
2457         /* Let CP AST to grant the lock first. */
2458         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2459
2460         if (aa->oa_speculative) {
2461                 LASSERT(aa->oa_lvb == NULL);
2462                 LASSERT(aa->oa_flags == NULL);
2463                 aa->oa_flags = &flags;
2464         }
2465
2466         /* Complete obtaining the lock procedure. */
2467         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2468                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2469                                    lockh, rc);
2470         /* Complete osc stuff. */
2471         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2472                               aa->oa_flags, aa->oa_speculative, rc);
2473
2474         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2475
2476         ldlm_lock_decref(lockh, mode);
2477         LDLM_LOCK_PUT(lock);
2478         RETURN(rc);
2479 }
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is evicted from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, union ldlm_policy_data *policy,
2492                      struct ost_lvb *lvb, int kms_valid,
2493                      osc_enqueue_upcall_f upcall, void *cookie,
2494                      struct ldlm_enqueue_info *einfo,
2495                      struct ptlrpc_request_set *rqset, int async,
2496                      bool speculative)
2497 {
2498         struct obd_device *obd = exp->exp_obd;
2499         struct lustre_handle lockh = { 0 };
2500         struct ptlrpc_request *req = NULL;
2501         int intent = *flags & LDLM_FL_HAS_INTENT;
2502         __u64 match_flags = *flags;
2503         enum ldlm_mode mode;
2504         int rc;
2505         ENTRY;
2506
2507         /* Filesystem lock extents are extended to page boundaries so that
2508          * dealing with the page cache is a little smoother.  */
2509         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2510         policy->l_extent.end |= ~PAGE_MASK;
2511
2512         /*
2513          * kms is not valid when either object is completely fresh (so that no
2514          * locks are cached), or object was evicted. In the latter case cached
2515          * lock cannot be used, because it would prime inode state with
2516          * potentially stale LVB.
2517          */
2518         if (!kms_valid)
2519                 goto no_match;
2520
2521         /* Next, search for already existing extent locks that will cover us */
2522         /* If we're trying to read, we also search for an existing PW lock.  The
2523          * VFS and page cache already protect us locally, so lots of readers/
2524          * writers can share a single PW lock.
2525          *
2526          * There are problems with conversion deadlocks, so instead of
2527          * converting a read lock to a write lock, we'll just enqueue a new
2528          * one.
2529          *
2530          * At some point we should cancel the read lock instead of making them
2531          * send us a blocking callback, but there are problems with canceling
2532          * locks out from other users right now, too. */
2533         mode = einfo->ei_mode;
2534         if (einfo->ei_mode == LCK_PR)
2535                 mode |= LCK_PW;
2536         /* Normal lock requests must wait for the LVB to be ready before
2537          * matching a lock; speculative lock requests do not need to,
2538          * because they will not actually use the lock. */
2539         if (!speculative)
2540                 match_flags |= LDLM_FL_LVB_READY;
2541         if (intent != 0)
2542                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2543         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2544                                einfo->ei_type, policy, mode, &lockh, 0);
2545         if (mode) {
2546                 struct ldlm_lock *matched;
2547
2548                 if (*flags & LDLM_FL_TEST_LOCK)
2549                         RETURN(ELDLM_OK);
2550
2551                 matched = ldlm_handle2lock(&lockh);
2552                 if (speculative) {
2553                         /* This DLM lock request is speculative, and does not
2554                          * have an associated IO request. Therefore if there
2555                          * is already a DLM lock, it wll just inform the
2556                          * caller to cancel the request for this stripe.*/
2557                         lock_res_and_lock(matched);
2558                         if (ldlm_extent_equal(&policy->l_extent,
2559                             &matched->l_policy_data.l_extent))
2560                                 rc = -EEXIST;
2561                         else
2562                                 rc = -ECANCELED;
2563                         unlock_res_and_lock(matched);
2564
2565                         ldlm_lock_decref(&lockh, mode);
2566                         LDLM_LOCK_PUT(matched);
2567                         RETURN(rc);
2568                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2569                         *flags |= LDLM_FL_LVB_READY;
2570
2571                         /* We already have a lock, and it's referenced. */
2572                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2573
2574                         ldlm_lock_decref(&lockh, mode);
2575                         LDLM_LOCK_PUT(matched);
2576                         RETURN(ELDLM_OK);
2577                 } else {
2578                         ldlm_lock_decref(&lockh, mode);
2579                         LDLM_LOCK_PUT(matched);
2580                 }
2581         }
2582
2583 no_match:
2584         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2585                 RETURN(-ENOLCK);
2586
2587         if (intent) {
2588                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2589                                            &RQF_LDLM_ENQUEUE_LVB);
2590                 if (req == NULL)
2591                         RETURN(-ENOMEM);
2592
2593                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2594                 if (rc) {
2595                         ptlrpc_request_free(req);
2596                         RETURN(rc);
2597                 }
2598
2599                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2600                                      sizeof *lvb);
2601                 ptlrpc_request_set_replen(req);
2602         }
2603
2604         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2605         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2606
2607         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2608                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2609         if (async) {
2610                 if (!rc) {
2611                         struct osc_enqueue_args *aa;
2612                         aa = ptlrpc_req_async_args(aa, req);
2613                         aa->oa_exp         = exp;
2614                         aa->oa_mode        = einfo->ei_mode;
2615                         aa->oa_type        = einfo->ei_type;
2616                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2617                         aa->oa_upcall      = upcall;
2618                         aa->oa_cookie      = cookie;
2619                         aa->oa_speculative = speculative;
2620                         if (!speculative) {
2621                                 aa->oa_flags  = flags;
2622                                 aa->oa_lvb    = lvb;
2623                         } else {
2624                                 /* speculative locks are essentially to enqueue
2625                                  * a DLM lock  in advance, so we don't care
2626                                  * about the result of the enqueue. */
2627                                 aa->oa_lvb    = NULL;
2628                                 aa->oa_flags  = NULL;
2629                         }
2630
2631                         req->rq_interpret_reply = osc_enqueue_interpret;
2632                         if (rqset == PTLRPCD_SET)
2633                                 ptlrpcd_add_req(req);
2634                         else
2635                                 ptlrpc_set_add_req(rqset, req);
2636                 } else if (intent) {
2637                         ptlrpc_req_finished(req);
2638                 }
2639                 RETURN(rc);
2640         }
2641
2642         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2643                               flags, speculative, rc);
2644         if (intent)
2645                 ptlrpc_req_finished(req);
2646
2647         RETURN(rc);
2648 }
2649
2650 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2651                    enum ldlm_type type, union ldlm_policy_data *policy,
2652                    enum ldlm_mode mode, __u64 *flags, void *data,
2653                    struct lustre_handle *lockh, int unref)
2654 {
2655         struct obd_device *obd = exp->exp_obd;
2656         __u64 lflags = *flags;
2657         enum ldlm_mode rc;
2658         ENTRY;
2659
2660         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2661                 RETURN(-EIO);
2662
2663         /* Filesystem lock extents are extended to page boundaries so that
2664          * dealing with the page cache is a little smoother */
2665         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2666         policy->l_extent.end |= ~PAGE_MASK;
2667
2668         /* Next, search for already existing extent locks that will cover us */
2669         /* If we're trying to read, we also search for an existing PW lock.  The
2670          * VFS and page cache already protect us locally, so lots of readers/
2671          * writers can share a single PW lock. */
2672         rc = mode;
2673         if (mode == LCK_PR)
2674                 rc |= LCK_PW;
2675         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2676                              res_id, type, policy, rc, lockh, unref);
2677         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2678                 RETURN(rc);
2679
2680         if (data != NULL) {
2681                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2682
2683                 LASSERT(lock != NULL);
2684                 if (!osc_set_lock_data(lock, data)) {
2685                         ldlm_lock_decref(lockh, rc);
2686                         rc = 0;
2687                 }
2688                 LDLM_LOCK_PUT(lock);
2689         }
2690         RETURN(rc);
2691 }
2692
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694                                 struct ptlrpc_request *req, void *args, int rc)
2695 {
2696         struct osc_async_args *aa = args;
2697         struct obd_statfs *msfs;
2698
2699         ENTRY;
2700         if (rc == -EBADR)
2701                 /*
2702                  * The request has in fact never been sent due to issues at
2703                  * a higher level (LOV).  Exit immediately since the caller
2704                  * is aware of the problem and takes care of the clean up.
2705                  */
2706                 RETURN(rc);
2707
2708         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2709             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2710                 GOTO(out, rc = 0);
2711
2712         if (rc != 0)
2713                 GOTO(out, rc);
2714
2715         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2716         if (msfs == NULL)
2717                 GOTO(out, rc = -EPROTO);
2718
2719         *aa->aa_oi->oi_osfs = *msfs;
2720 out:
2721         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2722
2723         RETURN(rc);
2724 }
2725
2726 static int osc_statfs_async(struct obd_export *exp,
2727                             struct obd_info *oinfo, time64_t max_age,
2728                             struct ptlrpc_request_set *rqset)
2729 {
2730         struct obd_device     *obd = class_exp2obd(exp);
2731         struct ptlrpc_request *req;
2732         struct osc_async_args *aa;
2733         int rc;
2734         ENTRY;
2735
2736         if (obd->obd_osfs_age >= max_age) {
2737                 CDEBUG(D_SUPER,
2738                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2739                        obd->obd_name, &obd->obd_osfs,
2740                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2741                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2742                 spin_lock(&obd->obd_osfs_lock);
2743                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2744                 spin_unlock(&obd->obd_osfs_lock);
2745                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2746                 if (oinfo->oi_cb_up)
2747                         oinfo->oi_cb_up(oinfo, 0);
2748
2749                 RETURN(0);
2750         }
2751
2752         /* We could possibly pass max_age in the request (as an absolute
2753          * timestamp or a "seconds.usec ago") so the target can avoid doing
2754          * extra calls into the filesystem if that isn't necessary (e.g.
2755          * during mount that would help a bit).  Having relative timestamps
2756          * is not so great if request processing is slow, while absolute
2757          * timestamps are not ideal because they need time synchronization. */
2758         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2759         if (req == NULL)
2760                 RETURN(-ENOMEM);
2761
2762         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2763         if (rc) {
2764                 ptlrpc_request_free(req);
2765                 RETURN(rc);
2766         }
2767         ptlrpc_request_set_replen(req);
2768         req->rq_request_portal = OST_CREATE_PORTAL;
2769         ptlrpc_at_set_req_timeout(req);
2770
2771         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2772                 /* procfs requests not want stat in wait for avoid deadlock */
2773                 req->rq_no_resend = 1;
2774                 req->rq_no_delay = 1;
2775         }
2776
2777         req->rq_interpret_reply = osc_statfs_interpret;
2778         aa = ptlrpc_req_async_args(aa, req);
2779         aa->aa_oi = oinfo;
2780
2781         ptlrpc_set_add_req(rqset, req);
2782         RETURN(0);
2783 }
2784
2785 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2786                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2787 {
2788         struct obd_device     *obd = class_exp2obd(exp);
2789         struct obd_statfs     *msfs;
2790         struct ptlrpc_request *req;
2791         struct obd_import     *imp = NULL;
2792         int rc;
2793         ENTRY;
2794
2795
2796         /*Since the request might also come from lprocfs, so we need
2797          *sync this with client_disconnect_export Bug15684*/
2798         down_read(&obd->u.cli.cl_sem);
2799         if (obd->u.cli.cl_import)
2800                 imp = class_import_get(obd->u.cli.cl_import);
2801         up_read(&obd->u.cli.cl_sem);
2802         if (!imp)
2803                 RETURN(-ENODEV);
2804
2805         /* We could possibly pass max_age in the request (as an absolute
2806          * timestamp or a "seconds.usec ago") so the target can avoid doing
2807          * extra calls into the filesystem if that isn't necessary (e.g.
2808          * during mount that would help a bit).  Having relative timestamps
2809          * is not so great if request processing is slow, while absolute
2810          * timestamps are not ideal because they need time synchronization. */
2811         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2812
2813         class_import_put(imp);
2814
2815         if (req == NULL)
2816                 RETURN(-ENOMEM);
2817
2818         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2819         if (rc) {
2820                 ptlrpc_request_free(req);
2821                 RETURN(rc);
2822         }
2823         ptlrpc_request_set_replen(req);
2824         req->rq_request_portal = OST_CREATE_PORTAL;
2825         ptlrpc_at_set_req_timeout(req);
2826
2827         if (flags & OBD_STATFS_NODELAY) {
2828                 /* procfs requests not want stat in wait for avoid deadlock */
2829                 req->rq_no_resend = 1;
2830                 req->rq_no_delay = 1;
2831         }
2832
2833         rc = ptlrpc_queue_wait(req);
2834         if (rc)
2835                 GOTO(out, rc);
2836
2837         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2838         if (msfs == NULL)
2839                 GOTO(out, rc = -EPROTO);
2840
2841         *osfs = *msfs;
2842
2843         EXIT;
2844 out:
2845         ptlrpc_req_finished(req);
2846         return rc;
2847 }
2848
2849 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2850                          void *karg, void __user *uarg)
2851 {
2852         struct obd_device *obd = exp->exp_obd;
2853         struct obd_ioctl_data *data = karg;
2854         int rc = 0;
2855
2856         ENTRY;
2857         if (!try_module_get(THIS_MODULE)) {
2858                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2859                        module_name(THIS_MODULE));
2860                 return -EINVAL;
2861         }
2862         switch (cmd) {
2863         case OBD_IOC_CLIENT_RECOVER:
2864                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2865                                            data->ioc_inlbuf1, 0);
2866                 if (rc > 0)
2867                         rc = 0;
2868                 break;
2869         case IOC_OSC_SET_ACTIVE:
2870                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2871                                               data->ioc_offset);
2872                 break;
2873         default:
2874                 rc = -ENOTTY;
2875                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2876                        obd->obd_name, cmd, current_comm(), rc);
2877                 break;
2878         }
2879
2880         module_put(THIS_MODULE);
2881         return rc;
2882 }
2883
2884 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2885                        u32 keylen, void *key, u32 vallen, void *val,
2886                        struct ptlrpc_request_set *set)
2887 {
2888         struct ptlrpc_request *req;
2889         struct obd_device     *obd = exp->exp_obd;
2890         struct obd_import     *imp = class_exp2cliimp(exp);
2891         char                  *tmp;
2892         int                    rc;
2893         ENTRY;
2894
2895         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2896
2897         if (KEY_IS(KEY_CHECKSUM)) {
2898                 if (vallen != sizeof(int))
2899                         RETURN(-EINVAL);
2900                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2901                 RETURN(0);
2902         }
2903
2904         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2905                 sptlrpc_conf_client_adapt(obd);
2906                 RETURN(0);
2907         }
2908
2909         if (KEY_IS(KEY_FLUSH_CTX)) {
2910                 sptlrpc_import_flush_my_ctx(imp);
2911                 RETURN(0);
2912         }
2913
2914         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2915                 struct client_obd *cli = &obd->u.cli;
2916                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2917                 long target = *(long *)val;
2918
2919                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2920                 *(long *)val -= nr;
2921                 RETURN(0);
2922         }
2923
2924         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2925                 RETURN(-EINVAL);
2926
2927         /* We pass all other commands directly to OST. Since nobody calls osc
2928            methods directly and everybody is supposed to go through LOV, we
2929            assume lov checked invalid values for us.
2930            The only recognised values so far are evict_by_nid and mds_conn.
2931            Even if something bad goes through, we'd get a -EINVAL from OST
2932            anyway. */
2933
2934         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2935                                                 &RQF_OST_SET_GRANT_INFO :
2936                                                 &RQF_OBD_SET_INFO);
2937         if (req == NULL)
2938                 RETURN(-ENOMEM);
2939
2940         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2941                              RCL_CLIENT, keylen);
2942         if (!KEY_IS(KEY_GRANT_SHRINK))
2943                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2944                                      RCL_CLIENT, vallen);
2945         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2946         if (rc) {
2947                 ptlrpc_request_free(req);
2948                 RETURN(rc);
2949         }
2950
2951         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2952         memcpy(tmp, key, keylen);
2953         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2954                                                         &RMF_OST_BODY :
2955                                                         &RMF_SETINFO_VAL);
2956         memcpy(tmp, val, vallen);
2957
2958         if (KEY_IS(KEY_GRANT_SHRINK)) {
2959                 struct osc_grant_args *aa;
2960                 struct obdo *oa;
2961
2962                 aa = ptlrpc_req_async_args(aa, req);
2963                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2964                 if (!oa) {
2965                         ptlrpc_req_finished(req);
2966                         RETURN(-ENOMEM);
2967                 }
2968                 *oa = ((struct ost_body *)val)->oa;
2969                 aa->aa_oa = oa;
2970                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2971         }
2972
2973         ptlrpc_request_set_replen(req);
2974         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2975                 LASSERT(set != NULL);
2976                 ptlrpc_set_add_req(set, req);
2977                 ptlrpc_check_set(NULL, set);
2978         } else {
2979                 ptlrpcd_add_req(req);
2980         }
2981
2982         RETURN(0);
2983 }
2984 EXPORT_SYMBOL(osc_set_info_async);
2985
2986 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2987                   struct obd_device *obd, struct obd_uuid *cluuid,
2988                   struct obd_connect_data *data, void *localdata)
2989 {
2990         struct client_obd *cli = &obd->u.cli;
2991
2992         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2993                 long lost_grant;
2994                 long grant;
2995
2996                 spin_lock(&cli->cl_loi_list_lock);
2997                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2998                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2999                         /* restore ocd_grant_blkbits as client page bits */
3000                         data->ocd_grant_blkbits = PAGE_SHIFT;
3001                         grant += cli->cl_dirty_grant;
3002                 } else {
3003                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3004                 }
3005                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3006                 lost_grant = cli->cl_lost_grant;
3007                 cli->cl_lost_grant = 0;
3008                 spin_unlock(&cli->cl_loi_list_lock);
3009
3010                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3011                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3012                        data->ocd_version, data->ocd_grant, lost_grant);
3013         }
3014
3015         RETURN(0);
3016 }
3017 EXPORT_SYMBOL(osc_reconnect);
3018
3019 int osc_disconnect(struct obd_export *exp)
3020 {
3021         struct obd_device *obd = class_exp2obd(exp);
3022         int rc;
3023
3024         rc = client_disconnect_export(exp);
3025         /**
3026          * Initially we put del_shrink_grant before disconnect_export, but it
3027          * causes the following problem if setup (connect) and cleanup
3028          * (disconnect) are tangled together.
3029          *      connect p1                     disconnect p2
3030          *   ptlrpc_connect_import
3031          *     ...............               class_manual_cleanup
3032          *                                     osc_disconnect
3033          *                                     del_shrink_grant
3034          *   ptlrpc_connect_interrupt
3035          *     osc_init_grant
3036          *   add this client to shrink list
3037          *                                      cleanup_osc
3038          * Bang! grant shrink thread trigger the shrink. BUG18662
3039          */
3040         osc_del_grant_list(&obd->u.cli);
3041         return rc;
3042 }
3043 EXPORT_SYMBOL(osc_disconnect);
3044
3045 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3046                                  struct hlist_node *hnode, void *arg)
3047 {
3048         struct lu_env *env = arg;
3049         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3050         struct ldlm_lock *lock;
3051         struct osc_object *osc = NULL;
3052         ENTRY;
3053
3054         lock_res(res);
3055         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3056                 if (lock->l_ast_data != NULL && osc == NULL) {
3057                         osc = lock->l_ast_data;
3058                         cl_object_get(osc2cl(osc));
3059                 }
3060
3061                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3062                  * by the 2nd round of ldlm_namespace_clean() call in
3063                  * osc_import_event(). */
3064                 ldlm_clear_cleaned(lock);
3065         }
3066         unlock_res(res);
3067
3068         if (osc != NULL) {
3069                 osc_object_invalidate(env, osc);
3070                 cl_object_put(env, osc2cl(osc));
3071         }
3072
3073         RETURN(0);
3074 }
3075 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3076
3077 static int osc_import_event(struct obd_device *obd,
3078                             struct obd_import *imp,
3079                             enum obd_import_event event)
3080 {
3081         struct client_obd *cli;
3082         int rc = 0;
3083
3084         ENTRY;
3085         LASSERT(imp->imp_obd == obd);
3086
3087         switch (event) {
3088         case IMP_EVENT_DISCON: {
3089                 cli = &obd->u.cli;
3090                 spin_lock(&cli->cl_loi_list_lock);
3091                 cli->cl_avail_grant = 0;
3092                 cli->cl_lost_grant = 0;
3093                 spin_unlock(&cli->cl_loi_list_lock);
3094                 break;
3095         }
3096         case IMP_EVENT_INACTIVE: {
3097                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3098                 break;
3099         }
3100         case IMP_EVENT_INVALIDATE: {
3101                 struct ldlm_namespace *ns = obd->obd_namespace;
3102                 struct lu_env         *env;
3103                 __u16                  refcheck;
3104
3105                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3106
3107                 env = cl_env_get(&refcheck);
3108                 if (!IS_ERR(env)) {
3109                         osc_io_unplug(env, &obd->u.cli, NULL);
3110
3111                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3112                                                  osc_ldlm_resource_invalidate,
3113                                                  env, 0);
3114                         cl_env_put(env, &refcheck);
3115
3116                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3117                 } else
3118                         rc = PTR_ERR(env);
3119                 break;
3120         }
3121         case IMP_EVENT_ACTIVE: {
3122                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3123                 break;
3124         }
3125         case IMP_EVENT_OCD: {
3126                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3127
3128                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3129                         osc_init_grant(&obd->u.cli, ocd);
3130
3131                 /* See bug 7198 */
3132                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3133                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3134
3135                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3136                 break;
3137         }
3138         case IMP_EVENT_DEACTIVATE: {
3139                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3140                 break;
3141         }
3142         case IMP_EVENT_ACTIVATE: {
3143                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3144                 break;
3145         }
3146         default:
3147                 CERROR("Unknown import event %d\n", event);
3148                 LBUG();
3149         }
3150         RETURN(rc);
3151 }
3152
3153 /**
3154  * Determine whether the lock can be canceled before replaying the lock
3155  * during recovery, see bug16774 for detailed information.
3156  *
3157  * \retval zero the lock can't be canceled
3158  * \retval other ok to cancel
3159  */
3160 static int osc_cancel_weight(struct ldlm_lock *lock)
3161 {
3162         /*
3163          * Cancel all unused and granted extent lock.
3164          */
3165         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3166             ldlm_is_granted(lock) &&
3167             osc_ldlm_weigh_ast(lock) == 0)
3168                 RETURN(1);
3169
3170         RETURN(0);
3171 }
3172
3173 static int brw_queue_work(const struct lu_env *env, void *data)
3174 {
3175         struct client_obd *cli = data;
3176
3177         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3178
3179         osc_io_unplug(env, cli, NULL);
3180         RETURN(0);
3181 }
3182
3183 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3184 {
3185         struct client_obd *cli = &obd->u.cli;
3186         void *handler;
3187         int rc;
3188
3189         ENTRY;
3190
3191         rc = ptlrpcd_addref();
3192         if (rc)
3193                 RETURN(rc);
3194
3195         rc = client_obd_setup(obd, lcfg);
3196         if (rc)
3197                 GOTO(out_ptlrpcd, rc);
3198
3199
3200         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3201         if (IS_ERR(handler))
3202                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3203         cli->cl_writeback_work = handler;
3204
3205         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3206         if (IS_ERR(handler))
3207                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3208         cli->cl_lru_work = handler;
3209
3210         rc = osc_quota_setup(obd);
3211         if (rc)
3212                 GOTO(out_ptlrpcd_work, rc);
3213
3214         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3215         osc_update_next_shrink(cli);
3216
3217         RETURN(rc);
3218
3219 out_ptlrpcd_work:
3220         if (cli->cl_writeback_work != NULL) {
3221                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3222                 cli->cl_writeback_work = NULL;
3223         }
3224         if (cli->cl_lru_work != NULL) {
3225                 ptlrpcd_destroy_work(cli->cl_lru_work);
3226                 cli->cl_lru_work = NULL;
3227         }
3228         client_obd_cleanup(obd);
3229 out_ptlrpcd:
3230         ptlrpcd_decref();
3231         RETURN(rc);
3232 }
3233 EXPORT_SYMBOL(osc_setup_common);
3234
3235 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3236 {
3237         struct client_obd *cli = &obd->u.cli;
3238         int                adding;
3239         int                added;
3240         int                req_count;
3241         int                rc;
3242
3243         ENTRY;
3244
3245         rc = osc_setup_common(obd, lcfg);
3246         if (rc < 0)
3247                 RETURN(rc);
3248
3249         rc = osc_tunables_init(obd);
3250         if (rc)
3251                 RETURN(rc);
3252
3253         /*
3254          * We try to control the total number of requests with a upper limit
3255          * osc_reqpool_maxreqcount. There might be some race which will cause
3256          * over-limit allocation, but it is fine.
3257          */
3258         req_count = atomic_read(&osc_pool_req_count);
3259         if (req_count < osc_reqpool_maxreqcount) {
3260                 adding = cli->cl_max_rpcs_in_flight + 2;
3261                 if (req_count + adding > osc_reqpool_maxreqcount)
3262                         adding = osc_reqpool_maxreqcount - req_count;
3263
3264                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3265                 atomic_add(added, &osc_pool_req_count);
3266         }
3267
3268         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3269
3270         spin_lock(&osc_shrink_lock);
3271         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3272         spin_unlock(&osc_shrink_lock);
3273         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3274         cli->cl_import->imp_idle_debug = D_HA;
3275
3276         RETURN(0);
3277 }
3278
3279 int osc_precleanup_common(struct obd_device *obd)
3280 {
3281         struct client_obd *cli = &obd->u.cli;
3282         ENTRY;
3283
3284         /* LU-464
3285          * for echo client, export may be on zombie list, wait for
3286          * zombie thread to cull it, because cli.cl_import will be
3287          * cleared in client_disconnect_export():
3288          *   class_export_destroy() -> obd_cleanup() ->
3289          *   echo_device_free() -> echo_client_cleanup() ->
3290          *   obd_disconnect() -> osc_disconnect() ->
3291          *   client_disconnect_export()
3292          */
3293         obd_zombie_barrier();
3294         if (cli->cl_writeback_work) {
3295                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3296                 cli->cl_writeback_work = NULL;
3297         }
3298
3299         if (cli->cl_lru_work) {
3300                 ptlrpcd_destroy_work(cli->cl_lru_work);
3301                 cli->cl_lru_work = NULL;
3302         }
3303
3304         obd_cleanup_client_import(obd);
3305         RETURN(0);
3306 }
3307 EXPORT_SYMBOL(osc_precleanup_common);
3308
3309 static int osc_precleanup(struct obd_device *obd)
3310 {
3311         ENTRY;
3312
3313         osc_precleanup_common(obd);
3314
3315         ptlrpc_lprocfs_unregister_obd(obd);
3316         RETURN(0);
3317 }
3318
3319 int osc_cleanup_common(struct obd_device *obd)
3320 {
3321         struct client_obd *cli = &obd->u.cli;
3322         int rc;
3323
3324         ENTRY;
3325
3326         spin_lock(&osc_shrink_lock);
3327         list_del(&cli->cl_shrink_list);
3328         spin_unlock(&osc_shrink_lock);
3329
3330         /* lru cleanup */
3331         if (cli->cl_cache != NULL) {
3332                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3333                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3334                 list_del_init(&cli->cl_lru_osc);
3335                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3336                 cli->cl_lru_left = NULL;
3337                 cl_cache_decref(cli->cl_cache);
3338                 cli->cl_cache = NULL;
3339         }
3340
3341         /* free memory of osc quota cache */
3342         osc_quota_cleanup(obd);
3343
3344         rc = client_obd_cleanup(obd);
3345
3346         ptlrpcd_decref();
3347         RETURN(rc);
3348 }
3349 EXPORT_SYMBOL(osc_cleanup_common);
3350
3351 static struct obd_ops osc_obd_ops = {
3352         .o_owner                = THIS_MODULE,
3353         .o_setup                = osc_setup,
3354         .o_precleanup           = osc_precleanup,
3355         .o_cleanup              = osc_cleanup_common,
3356         .o_add_conn             = client_import_add_conn,
3357         .o_del_conn             = client_import_del_conn,
3358         .o_connect              = client_connect_import,
3359         .o_reconnect            = osc_reconnect,
3360         .o_disconnect           = osc_disconnect,
3361         .o_statfs               = osc_statfs,
3362         .o_statfs_async         = osc_statfs_async,
3363         .o_create               = osc_create,
3364         .o_destroy              = osc_destroy,
3365         .o_getattr              = osc_getattr,
3366         .o_setattr              = osc_setattr,
3367         .o_iocontrol            = osc_iocontrol,
3368         .o_set_info_async       = osc_set_info_async,
3369         .o_import_event         = osc_import_event,
3370         .o_quotactl             = osc_quotactl,
3371 };
3372
3373 static struct shrinker *osc_cache_shrinker;
3374 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3375 DEFINE_SPINLOCK(osc_shrink_lock);
3376
3377 #ifndef HAVE_SHRINKER_COUNT
3378 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3379 {
3380         struct shrink_control scv = {
3381                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3382                 .gfp_mask   = shrink_param(sc, gfp_mask)
3383         };
3384 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3385         struct shrinker *shrinker = NULL;
3386 #endif
3387
3388         (void)osc_cache_shrink_scan(shrinker, &scv);
3389
3390         return osc_cache_shrink_count(shrinker, &scv);
3391 }
3392 #endif
3393
3394 static int __init osc_init(void)
3395 {
3396         unsigned int reqpool_size;
3397         unsigned int reqsize;
3398         int rc;
3399         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3400                          osc_cache_shrink_count, osc_cache_shrink_scan);
3401         ENTRY;
3402
3403         /* print an address of _any_ initialized kernel symbol from this
3404          * module, to allow debugging with gdb that doesn't support data
3405          * symbols from modules.*/
3406         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3407
3408         rc = lu_kmem_init(osc_caches);
3409         if (rc)
3410                 RETURN(rc);
3411
3412         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3413                                  LUSTRE_OSC_NAME, &osc_device_type);
3414         if (rc)
3415                 GOTO(out_kmem, rc);
3416
3417         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3418
3419         /* This is obviously too much memory, only prevent overflow here */
3420         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3421                 GOTO(out_type, rc = -EINVAL);
3422
3423         reqpool_size = osc_reqpool_mem_max << 20;
3424
3425         reqsize = 1;
3426         while (reqsize < OST_IO_MAXREQSIZE)
3427                 reqsize = reqsize << 1;
3428
3429         /*
3430          * We don't enlarge the request count in OSC pool according to
3431          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3432          * tried after normal allocation failed. So a small OSC pool won't
3433          * cause much performance degression in most of cases.
3434          */
3435         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3436
3437         atomic_set(&osc_pool_req_count, 0);
3438         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3439                                           ptlrpc_add_rqs_to_pool);
3440
3441         if (osc_rq_pool == NULL)
3442                 GOTO(out_type, rc = -ENOMEM);
3443
3444         rc = osc_start_grant_work();
3445         if (rc != 0)
3446                 GOTO(out_req_pool, rc);
3447
3448         RETURN(rc);
3449
3450 out_req_pool:
3451         ptlrpc_free_rq_pool(osc_rq_pool);
3452 out_type:
3453         class_unregister_type(LUSTRE_OSC_NAME);
3454 out_kmem:
3455         lu_kmem_fini(osc_caches);
3456
3457         RETURN(rc);
3458 }
3459
3460 static void __exit osc_exit(void)
3461 {
3462         osc_stop_grant_work();
3463         remove_shrinker(osc_cache_shrinker);
3464         class_unregister_type(LUSTRE_OSC_NAME);
3465         lu_kmem_fini(osc_caches);
3466         ptlrpc_free_rq_pool(osc_rq_pool);
3467 }
3468
3469 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3470 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3471 MODULE_VERSION(LUSTRE_VERSION_STRING);
3472 MODULE_LICENSE("GPL");
3473
3474 module_init(osc_init);
3475 module_exit(osc_exit);