Whamcloud - gitweb
994201e449789e204fc143355e5b6007caf87216
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235                 sa = ptlrpc_req_async_args(req);
236                 sa->sa_oa = oa;
237                 sa->sa_upcall = upcall;
238                 sa->sa_cookie = cookie;
239
240                 if (rqset == PTLRPCD_SET)
241                         ptlrpcd_add_req(req);
242                 else
243                         ptlrpc_set_add_req(rqset, req);
244         }
245
246         RETURN(0);
247 }
248
249 static int osc_ladvise_interpret(const struct lu_env *env,
250                                  struct ptlrpc_request *req,
251                                  void *arg, int rc)
252 {
253         struct osc_ladvise_args *la = arg;
254         struct ost_body *body;
255         ENTRY;
256
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261         if (body == NULL)
262                 GOTO(out, rc = -EPROTO);
263
264         *la->la_oa = body->oa;
265 out:
266         rc = la->la_upcall(la->la_cookie, rc);
267         RETURN(rc);
268 }
269
270 /**
271  * If rqset is NULL, do not wait for response. Upcall and cookie could also
272  * be NULL in this case
273  */
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275                      struct ladvise_hdr *ladvise_hdr,
276                      obd_enqueue_update_f upcall, void *cookie,
277                      struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct ost_body         *body;
281         struct osc_ladvise_args *la;
282         int                      rc;
283         struct lu_ladvise       *req_ladvise;
284         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
285         int                      num_advise = ladvise_hdr->lah_count;
286         struct ladvise_hdr      *req_ladvise_hdr;
287         ENTRY;
288
289         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
290         if (req == NULL)
291                 RETURN(-ENOMEM);
292
293         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294                              num_advise * sizeof(*ladvise));
295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296         if (rc != 0) {
297                 ptlrpc_request_free(req);
298                 RETURN(rc);
299         }
300         req->rq_request_portal = OST_IO_PORTAL;
301         ptlrpc_at_set_req_timeout(req);
302
303         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304         LASSERT(body);
305         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306                              oa);
307
308         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309                                                  &RMF_OST_LADVISE_HDR);
310         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311
312         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314         ptlrpc_request_set_replen(req);
315
316         if (rqset == NULL) {
317                 /* Do not wait for response. */
318                 ptlrpcd_add_req(req);
319                 RETURN(0);
320         }
321
322         req->rq_interpret_reply = osc_ladvise_interpret;
323         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324         la = ptlrpc_req_async_args(req);
325         la->la_oa = oa;
326         la->la_upcall = upcall;
327         la->la_cookie = cookie;
328
329         if (rqset == PTLRPCD_SET)
330                 ptlrpcd_add_req(req);
331         else
332                 ptlrpc_set_add_req(rqset, req);
333
334         RETURN(0);
335 }
336
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
338                       struct obdo *oa)
339 {
340         struct ptlrpc_request *req;
341         struct ost_body       *body;
342         int                    rc;
343         ENTRY;
344
345         LASSERT(oa != NULL);
346         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
348
349         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
350         if (req == NULL)
351                 GOTO(out, rc = -ENOMEM);
352
353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
354         if (rc) {
355                 ptlrpc_request_free(req);
356                 GOTO(out, rc);
357         }
358
359         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
360         LASSERT(body);
361
362         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
363
364         ptlrpc_request_set_replen(req);
365
366         rc = ptlrpc_queue_wait(req);
367         if (rc)
368                 GOTO(out_req, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out_req, rc = -EPROTO);
373
374         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
376
377         oa->o_blksize = cli_brw_size(exp->exp_obd);
378         oa->o_valid |= OBD_MD_FLBLKSZ;
379
380         CDEBUG(D_HA, "transno: %lld\n",
381                lustre_msg_get_transno(req->rq_repmsg));
382 out_req:
383         ptlrpc_req_finished(req);
384 out:
385         RETURN(rc);
386 }
387
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389                    obd_enqueue_update_f upcall, void *cookie)
390 {
391         struct ptlrpc_request *req;
392         struct osc_setattr_args *sa;
393         struct obd_import *imp = class_exp2cliimp(exp);
394         struct ost_body *body;
395         int rc;
396
397         ENTRY;
398
399         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
404         if (rc < 0) {
405                 ptlrpc_request_free(req);
406                 RETURN(rc);
407         }
408
409         osc_set_io_portal(req);
410
411         ptlrpc_at_set_req_timeout(req);
412
413         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414
415         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
416
417         ptlrpc_request_set_replen(req);
418
419         req->rq_interpret_reply = osc_setattr_interpret;
420         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421         sa = ptlrpc_req_async_args(req);
422         sa->sa_oa = oa;
423         sa->sa_upcall = upcall;
424         sa->sa_cookie = cookie;
425
426         ptlrpcd_add_req(req);
427
428         RETURN(0);
429 }
430 EXPORT_SYMBOL(osc_punch_send);
431
432 static int osc_sync_interpret(const struct lu_env *env,
433                               struct ptlrpc_request *req, void *args, int rc)
434 {
435         struct osc_fsync_args *fa = args;
436         struct ost_body *body;
437         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438         unsigned long valid = 0;
439         struct cl_object *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *args, int rc)
551 {
552         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
553
554         atomic_dec(&cli->cl_destroy_in_flight);
555         wake_up(&cli->cl_destroy_waitq);
556
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676                 unsigned long undirty;
677
678                 nrpages = cli->cl_max_pages_per_rpc;
679                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681                 undirty = nrpages << PAGE_SHIFT;
682                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
683                                  GRANT_PARAM)) {
684                         int nrextents;
685
686                         /* take extent tax into account when asking for more
687                          * grant space */
688                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
689                                      cli->cl_max_extent_pages;
690                         undirty += nrextents * cli->cl_grant_extent_tax;
691                 }
692                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693                  * to add extent tax, etc.
694                  */
695                 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
696                                     (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
697         }
698         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699         oa->o_dropped = cli->cl_lost_grant;
700         cli->cl_lost_grant = 0;
701         spin_unlock(&cli->cl_loi_list_lock);
702         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 }
705
706 void osc_update_next_shrink(struct client_obd *cli)
707 {
708         cli->cl_next_shrink_grant = ktime_get_seconds() +
709                                     cli->cl_grant_shrink_interval;
710
711         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712                cli->cl_next_shrink_grant);
713 }
714
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
716 {
717         spin_lock(&cli->cl_loi_list_lock);
718         cli->cl_avail_grant += grant;
719         spin_unlock(&cli->cl_loi_list_lock);
720 }
721
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 {
724         if (body->oa.o_valid & OBD_MD_FLGRANT) {
725                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726                 __osc_update_grant(cli, body->oa.o_grant);
727         }
728 }
729
730 /**
731  * grant thread data for shrinking space.
732  */
733 struct grant_thread_data {
734         struct list_head        gtd_clients;
735         struct mutex            gtd_mutex;
736         unsigned long           gtd_stopped:1;
737 };
738 static struct grant_thread_data client_gtd;
739
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741                                       struct ptlrpc_request *req,
742                                       void *args, int rc)
743 {
744         struct osc_grant_args *aa = args;
745         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746         struct ost_body *body;
747
748         if (rc != 0) {
749                 __osc_update_grant(cli, aa->aa_oa->o_grant);
750                 GOTO(out, rc);
751         }
752
753         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754         LASSERT(body);
755         osc_update_grant(cli, body);
756 out:
757         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
758
759         return rc;
760 }
761
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
763 {
764         spin_lock(&cli->cl_loi_list_lock);
765         oa->o_grant = cli->cl_avail_grant / 4;
766         cli->cl_avail_grant -= oa->o_grant;
767         spin_unlock(&cli->cl_loi_list_lock);
768         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769                 oa->o_valid |= OBD_MD_FLFLAGS;
770                 oa->o_flags = 0;
771         }
772         oa->o_flags |= OBD_FL_SHRINK_GRANT;
773         osc_update_next_shrink(cli);
774 }
775
776 /* Shrink the current grant, either from some large amount to enough for a
777  * full set of in-flight RPCs, or if we have already shrunk to that limit
778  * then to enough for a single RPC.  This avoids keeping more grant than
779  * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
781 {
782         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
784
785         spin_lock(&cli->cl_loi_list_lock);
786         if (cli->cl_avail_grant <= target_bytes)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788         spin_unlock(&cli->cl_loi_list_lock);
789
790         return osc_shrink_grant_to_target(cli, target_bytes);
791 }
792
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
794 {
795         int                     rc = 0;
796         struct ost_body        *body;
797         ENTRY;
798
799         spin_lock(&cli->cl_loi_list_lock);
800         /* Don't shrink if we are already above or below the desired limit
801          * We don't want to shrink below a single RPC, as that will negatively
802          * impact block allocation and long-term performance. */
803         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
805
806         if (target_bytes >= cli->cl_avail_grant) {
807                 spin_unlock(&cli->cl_loi_list_lock);
808                 RETURN(0);
809         }
810         spin_unlock(&cli->cl_loi_list_lock);
811
812         OBD_ALLOC_PTR(body);
813         if (!body)
814                 RETURN(-ENOMEM);
815
816         osc_announce_cached(cli, &body->oa, 0);
817
818         spin_lock(&cli->cl_loi_list_lock);
819         if (target_bytes >= cli->cl_avail_grant) {
820                 /* available grant has changed since target calculation */
821                 spin_unlock(&cli->cl_loi_list_lock);
822                 GOTO(out_free, rc = 0);
823         }
824         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825         cli->cl_avail_grant = target_bytes;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828                 body->oa.o_valid |= OBD_MD_FLFLAGS;
829                 body->oa.o_flags = 0;
830         }
831         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833
834         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836                                 sizeof(*body), body, NULL);
837         if (rc != 0)
838                 __osc_update_grant(cli, body->oa.o_grant);
839 out_free:
840         OBD_FREE_PTR(body);
841         RETURN(rc);
842 }
843
844 static int osc_should_shrink_grant(struct client_obd *client)
845 {
846         time64_t next_shrink = client->cl_next_shrink_grant;
847
848         if (client->cl_import == NULL)
849                 return 0;
850
851         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852              OBD_CONNECT_GRANT_SHRINK) == 0)
853                 return 0;
854
855         if (ktime_get_seconds() >= next_shrink - 5) {
856                 /* Get the current RPC size directly, instead of going via:
857                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858                  * Keep comment here so that it can be found by searching. */
859                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860
861                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862                     client->cl_avail_grant > brw_size)
863                         return 1;
864                 else
865                         osc_update_next_shrink(client);
866         }
867         return 0;
868 }
869
870 #define GRANT_SHRINK_RPC_BATCH  100
871
872 static struct delayed_work work;
873
874 static void osc_grant_work_handler(struct work_struct *data)
875 {
876         struct client_obd *cli;
877         int rpc_sent;
878         bool init_next_shrink = true;
879         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
880
881         rpc_sent = 0;
882         mutex_lock(&client_gtd.gtd_mutex);
883         list_for_each_entry(cli, &client_gtd.gtd_clients,
884                             cl_grant_chain) {
885                 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886                     osc_should_shrink_grant(cli))
887                         osc_shrink_grant(cli);
888
889                 if (!init_next_shrink) {
890                         if (cli->cl_next_shrink_grant < next_shrink &&
891                             cli->cl_next_shrink_grant > ktime_get_seconds())
892                                 next_shrink = cli->cl_next_shrink_grant;
893                 } else {
894                         init_next_shrink = false;
895                         next_shrink = cli->cl_next_shrink_grant;
896                 }
897         }
898         mutex_unlock(&client_gtd.gtd_mutex);
899
900         if (client_gtd.gtd_stopped == 1)
901                 return;
902
903         if (next_shrink > ktime_get_seconds())
904                 schedule_delayed_work(&work, msecs_to_jiffies(
905                                         (next_shrink - ktime_get_seconds()) *
906                                         MSEC_PER_SEC));
907         else
908                 schedule_work(&work.work);
909 }
910
911 void osc_schedule_grant_work(void)
912 {
913         cancel_delayed_work_sync(&work);
914         schedule_work(&work.work);
915 }
916
917 /**
918  * Start grant thread for returing grant to server for idle clients.
919  */
920 static int osc_start_grant_work(void)
921 {
922         client_gtd.gtd_stopped = 0;
923         mutex_init(&client_gtd.gtd_mutex);
924         INIT_LIST_HEAD(&client_gtd.gtd_clients);
925
926         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
927         schedule_work(&work.work);
928
929         return 0;
930 }
931
932 static void osc_stop_grant_work(void)
933 {
934         client_gtd.gtd_stopped = 1;
935         cancel_delayed_work_sync(&work);
936 }
937
938 static void osc_add_grant_list(struct client_obd *client)
939 {
940         mutex_lock(&client_gtd.gtd_mutex);
941         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
942         mutex_unlock(&client_gtd.gtd_mutex);
943 }
944
945 static void osc_del_grant_list(struct client_obd *client)
946 {
947         if (list_empty(&client->cl_grant_chain))
948                 return;
949
950         mutex_lock(&client_gtd.gtd_mutex);
951         list_del_init(&client->cl_grant_chain);
952         mutex_unlock(&client_gtd.gtd_mutex);
953 }
954
955 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
956 {
957         /*
958          * ocd_grant is the total grant amount we're expect to hold: if we've
959          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
960          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
961          * dirty.
962          *
963          * race is tolerable here: if we're evicted, but imp_state already
964          * left EVICTED state, then cl_dirty_pages must be 0 already.
965          */
966         spin_lock(&cli->cl_loi_list_lock);
967         cli->cl_avail_grant = ocd->ocd_grant;
968         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
969                 cli->cl_avail_grant -= cli->cl_reserved_grant;
970                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
971                         cli->cl_avail_grant -= cli->cl_dirty_grant;
972                 else
973                         cli->cl_avail_grant -=
974                                         cli->cl_dirty_pages << PAGE_SHIFT;
975         }
976
977         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
978                 u64 size;
979                 int chunk_mask;
980
981                 /* overhead for each extent insertion */
982                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
983                 /* determine the appropriate chunk size used by osc_extent. */
984                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
985                                           ocd->ocd_grant_blkbits);
986                 /* max_pages_per_rpc must be chunk aligned */
987                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
988                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
989                                              ~chunk_mask) & chunk_mask;
990                 /* determine maximum extent size, in #pages */
991                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
992                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
993                 if (cli->cl_max_extent_pages == 0)
994                         cli->cl_max_extent_pages = 1;
995         } else {
996                 cli->cl_grant_extent_tax = 0;
997                 cli->cl_chunkbits = PAGE_SHIFT;
998                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
999         }
1000         spin_unlock(&cli->cl_loi_list_lock);
1001
1002         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1003                 "chunk bits: %d cl_max_extent_pages: %d\n",
1004                 cli_name(cli),
1005                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1006                 cli->cl_max_extent_pages);
1007
1008         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1009                 osc_add_grant_list(cli);
1010 }
1011 EXPORT_SYMBOL(osc_init_grant);
1012
1013 /* We assume that the reason this OSC got a short read is because it read
1014  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1015  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1016  * this stripe never got written at or beyond this stripe offset yet. */
1017 static void handle_short_read(int nob_read, size_t page_count,
1018                               struct brw_page **pga)
1019 {
1020         char *ptr;
1021         int i = 0;
1022
1023         /* skip bytes read OK */
1024         while (nob_read > 0) {
1025                 LASSERT (page_count > 0);
1026
1027                 if (pga[i]->count > nob_read) {
1028                         /* EOF inside this page */
1029                         ptr = kmap(pga[i]->pg) +
1030                                 (pga[i]->off & ~PAGE_MASK);
1031                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1032                         kunmap(pga[i]->pg);
1033                         page_count--;
1034                         i++;
1035                         break;
1036                 }
1037
1038                 nob_read -= pga[i]->count;
1039                 page_count--;
1040                 i++;
1041         }
1042
1043         /* zero remaining pages */
1044         while (page_count-- > 0) {
1045                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1046                 memset(ptr, 0, pga[i]->count);
1047                 kunmap(pga[i]->pg);
1048                 i++;
1049         }
1050 }
1051
1052 static int check_write_rcs(struct ptlrpc_request *req,
1053                            int requested_nob, int niocount,
1054                            size_t page_count, struct brw_page **pga)
1055 {
1056         int     i;
1057         __u32   *remote_rcs;
1058
1059         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1060                                                   sizeof(*remote_rcs) *
1061                                                   niocount);
1062         if (remote_rcs == NULL) {
1063                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1064                 return(-EPROTO);
1065         }
1066
1067         /* return error if any niobuf was in error */
1068         for (i = 0; i < niocount; i++) {
1069                 if ((int)remote_rcs[i] < 0)
1070                         return(remote_rcs[i]);
1071
1072                 if (remote_rcs[i] != 0) {
1073                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1074                                 i, remote_rcs[i], req);
1075                         return(-EPROTO);
1076                 }
1077         }
1078         if (req->rq_bulk != NULL &&
1079             req->rq_bulk->bd_nob_transferred != requested_nob) {
1080                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1081                        req->rq_bulk->bd_nob_transferred, requested_nob);
1082                 return(-EPROTO);
1083         }
1084
1085         return (0);
1086 }
1087
1088 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1089 {
1090         if (p1->flag != p2->flag) {
1091                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1092                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1093                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1094
1095                 /* warn if we try to combine flags that we don't know to be
1096                  * safe to combine */
1097                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1098                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1099                               "report this at https://jira.whamcloud.com/\n",
1100                               p1->flag, p2->flag);
1101                 }
1102                 return 0;
1103         }
1104
1105         return (p1->off + p1->count == p2->off);
1106 }
1107
1108 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1109 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1110                                    size_t pg_count, struct brw_page **pga,
1111                                    int opc, obd_dif_csum_fn *fn,
1112                                    int sector_size,
1113                                    u32 *check_sum)
1114 {
1115         struct ahash_request *req;
1116         /* Used Adler as the default checksum type on top of DIF tags */
1117         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1118         struct page *__page;
1119         unsigned char *buffer;
1120         __u16 *guard_start;
1121         unsigned int bufsize;
1122         int guard_number;
1123         int used_number = 0;
1124         int used;
1125         u32 cksum;
1126         int rc = 0;
1127         int i = 0;
1128
1129         LASSERT(pg_count > 0);
1130
1131         __page = alloc_page(GFP_KERNEL);
1132         if (__page == NULL)
1133                 return -ENOMEM;
1134
1135         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1136         if (IS_ERR(req)) {
1137                 rc = PTR_ERR(req);
1138                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1139                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1140                 GOTO(out, rc);
1141         }
1142
1143         buffer = kmap(__page);
1144         guard_start = (__u16 *)buffer;
1145         guard_number = PAGE_SIZE / sizeof(*guard_start);
1146         while (nob > 0 && pg_count > 0) {
1147                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1148
1149                 /* corrupt the data before we compute the checksum, to
1150                  * simulate an OST->client data error */
1151                 if (unlikely(i == 0 && opc == OST_READ &&
1152                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1153                         unsigned char *ptr = kmap(pga[i]->pg);
1154                         int off = pga[i]->off & ~PAGE_MASK;
1155
1156                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1157                         kunmap(pga[i]->pg);
1158                 }
1159
1160                 /*
1161                  * The left guard number should be able to hold checksums of a
1162                  * whole page
1163                  */
1164                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1165                                                   pga[i]->off & ~PAGE_MASK,
1166                                                   count,
1167                                                   guard_start + used_number,
1168                                                   guard_number - used_number,
1169                                                   &used, sector_size,
1170                                                   fn);
1171                 if (rc)
1172                         break;
1173
1174                 used_number += used;
1175                 if (used_number == guard_number) {
1176                         cfs_crypto_hash_update_page(req, __page, 0,
1177                                 used_number * sizeof(*guard_start));
1178                         used_number = 0;
1179                 }
1180
1181                 nob -= pga[i]->count;
1182                 pg_count--;
1183                 i++;
1184         }
1185         kunmap(__page);
1186         if (rc)
1187                 GOTO(out, rc);
1188
1189         if (used_number != 0)
1190                 cfs_crypto_hash_update_page(req, __page, 0,
1191                         used_number * sizeof(*guard_start));
1192
1193         bufsize = sizeof(cksum);
1194         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1195
1196         /* For sending we only compute the wrong checksum instead
1197          * of corrupting the data so it is still correct on a redo */
1198         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1199                 cksum++;
1200
1201         *check_sum = cksum;
1202 out:
1203         __free_page(__page);
1204         return rc;
1205 }
1206 #else /* !CONFIG_CRC_T10DIF */
1207 #define obd_dif_ip_fn NULL
1208 #define obd_dif_crc_fn NULL
1209 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1210         -EOPNOTSUPP
1211 #endif /* CONFIG_CRC_T10DIF */
1212
1213 static int osc_checksum_bulk(int nob, size_t pg_count,
1214                              struct brw_page **pga, int opc,
1215                              enum cksum_types cksum_type,
1216                              u32 *cksum)
1217 {
1218         int                             i = 0;
1219         struct ahash_request           *req;
1220         unsigned int                    bufsize;
1221         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1222
1223         LASSERT(pg_count > 0);
1224
1225         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1226         if (IS_ERR(req)) {
1227                 CERROR("Unable to initialize checksum hash %s\n",
1228                        cfs_crypto_hash_name(cfs_alg));
1229                 return PTR_ERR(req);
1230         }
1231
1232         while (nob > 0 && pg_count > 0) {
1233                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1234
1235                 /* corrupt the data before we compute the checksum, to
1236                  * simulate an OST->client data error */
1237                 if (i == 0 && opc == OST_READ &&
1238                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1239                         unsigned char *ptr = kmap(pga[i]->pg);
1240                         int off = pga[i]->off & ~PAGE_MASK;
1241
1242                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1243                         kunmap(pga[i]->pg);
1244                 }
1245                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1246                                             pga[i]->off & ~PAGE_MASK,
1247                                             count);
1248                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1249                                (int)(pga[i]->off & ~PAGE_MASK));
1250
1251                 nob -= pga[i]->count;
1252                 pg_count--;
1253                 i++;
1254         }
1255
1256         bufsize = sizeof(*cksum);
1257         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1258
1259         /* For sending we only compute the wrong checksum instead
1260          * of corrupting the data so it is still correct on a redo */
1261         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1262                 (*cksum)++;
1263
1264         return 0;
1265 }
1266
1267 static int osc_checksum_bulk_rw(const char *obd_name,
1268                                 enum cksum_types cksum_type,
1269                                 int nob, size_t pg_count,
1270                                 struct brw_page **pga, int opc,
1271                                 u32 *check_sum)
1272 {
1273         obd_dif_csum_fn *fn = NULL;
1274         int sector_size = 0;
1275         int rc;
1276
1277         ENTRY;
1278         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1279
1280         if (fn)
1281                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1282                                              opc, fn, sector_size, check_sum);
1283         else
1284                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1285                                        check_sum);
1286
1287         RETURN(rc);
1288 }
1289
1290 static int
1291 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1292                      u32 page_count, struct brw_page **pga,
1293                      struct ptlrpc_request **reqp, int resend)
1294 {
1295         struct ptlrpc_request   *req;
1296         struct ptlrpc_bulk_desc *desc;
1297         struct ost_body         *body;
1298         struct obd_ioobj        *ioobj;
1299         struct niobuf_remote    *niobuf;
1300         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1301         struct osc_brw_async_args *aa;
1302         struct req_capsule      *pill;
1303         struct brw_page *pg_prev;
1304         void *short_io_buf;
1305         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1306
1307         ENTRY;
1308         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1309                 RETURN(-ENOMEM); /* Recoverable */
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1311                 RETURN(-EINVAL); /* Fatal */
1312
1313         if ((cmd & OBD_BRW_WRITE) != 0) {
1314                 opc = OST_WRITE;
1315                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1316                                                 osc_rq_pool,
1317                                                 &RQF_OST_BRW_WRITE);
1318         } else {
1319                 opc = OST_READ;
1320                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1321         }
1322         if (req == NULL)
1323                 RETURN(-ENOMEM);
1324
1325         for (niocount = i = 1; i < page_count; i++) {
1326                 if (!can_merge_pages(pga[i - 1], pga[i]))
1327                         niocount++;
1328         }
1329
1330         pill = &req->rq_pill;
1331         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1332                              sizeof(*ioobj));
1333         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1334                              niocount * sizeof(*niobuf));
1335
1336         for (i = 0; i < page_count; i++)
1337                 short_io_size += pga[i]->count;
1338
1339         /* Check if read/write is small enough to be a short io. */
1340         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1341             !imp_connect_shortio(cli->cl_import))
1342                 short_io_size = 0;
1343
1344         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1345                              opc == OST_READ ? 0 : short_io_size);
1346         if (opc == OST_READ)
1347                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1348                                      short_io_size);
1349
1350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1351         if (rc) {
1352                 ptlrpc_request_free(req);
1353                 RETURN(rc);
1354         }
1355         osc_set_io_portal(req);
1356
1357         ptlrpc_at_set_req_timeout(req);
1358         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1359          * retry logic */
1360         req->rq_no_retry_einprogress = 1;
1361
1362         if (short_io_size != 0) {
1363                 desc = NULL;
1364                 short_io_buf = NULL;
1365                 goto no_bulk;
1366         }
1367
1368         desc = ptlrpc_prep_bulk_imp(req, page_count,
1369                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1370                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1371                         PTLRPC_BULK_PUT_SINK) |
1372                         PTLRPC_BULK_BUF_KIOV,
1373                 OST_BULK_PORTAL,
1374                 &ptlrpc_bulk_kiov_pin_ops);
1375
1376         if (desc == NULL)
1377                 GOTO(out, rc = -ENOMEM);
1378         /* NB request now owns desc and will free it when it gets freed */
1379 no_bulk:
1380         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1381         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1382         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1383         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1384
1385         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1386
1387         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1388          * and from_kgid(), because they are asynchronous. Fortunately, variable
1389          * oa contains valid o_uid and o_gid in these two operations.
1390          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1391          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1392          * other process logic */
1393         body->oa.o_uid = oa->o_uid;
1394         body->oa.o_gid = oa->o_gid;
1395
1396         obdo_to_ioobj(oa, ioobj);
1397         ioobj->ioo_bufcnt = niocount;
1398         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1399          * that might be send for this request.  The actual number is decided
1400          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1401          * "max - 1" for old client compatibility sending "0", and also so the
1402          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1403         if (desc != NULL)
1404                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1405         else /* short io */
1406                 ioobj_max_brw_set(ioobj, 0);
1407
1408         if (short_io_size != 0) {
1409                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1410                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1411                         body->oa.o_flags = 0;
1412                 }
1413                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1414                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1415                        short_io_size);
1416                 if (opc == OST_WRITE) {
1417                         short_io_buf = req_capsule_client_get(pill,
1418                                                               &RMF_SHORT_IO);
1419                         LASSERT(short_io_buf != NULL);
1420                 }
1421         }
1422
1423         LASSERT(page_count > 0);
1424         pg_prev = pga[0];
1425         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1426                 struct brw_page *pg = pga[i];
1427                 int poff = pg->off & ~PAGE_MASK;
1428
1429                 LASSERT(pg->count > 0);
1430                 /* make sure there is no gap in the middle of page array */
1431                 LASSERTF(page_count == 1 ||
1432                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1433                           ergo(i > 0 && i < page_count - 1,
1434                                poff == 0 && pg->count == PAGE_SIZE)   &&
1435                           ergo(i == page_count - 1, poff == 0)),
1436                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1437                          i, page_count, pg, pg->off, pg->count);
1438                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1439                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1440                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1441                          i, page_count,
1442                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1443                          pg_prev->pg, page_private(pg_prev->pg),
1444                          pg_prev->pg->index, pg_prev->off);
1445                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1446                         (pg->flag & OBD_BRW_SRVLOCK));
1447                 if (short_io_size != 0 && opc == OST_WRITE) {
1448                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1449
1450                         LASSERT(short_io_size >= requested_nob + pg->count);
1451                         memcpy(short_io_buf + requested_nob,
1452                                ptr + poff,
1453                                pg->count);
1454                         ll_kunmap_atomic(ptr, KM_USER0);
1455                 } else if (short_io_size == 0) {
1456                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1457                                                          pg->count);
1458                 }
1459                 requested_nob += pg->count;
1460
1461                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1462                         niobuf--;
1463                         niobuf->rnb_len += pg->count;
1464                 } else {
1465                         niobuf->rnb_offset = pg->off;
1466                         niobuf->rnb_len    = pg->count;
1467                         niobuf->rnb_flags  = pg->flag;
1468                 }
1469                 pg_prev = pg;
1470         }
1471
1472         LASSERTF((void *)(niobuf - niocount) ==
1473                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1474                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1475                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1476
1477         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1478         if (resend) {
1479                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1480                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1481                         body->oa.o_flags = 0;
1482                 }
1483                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1484         }
1485
1486         if (osc_should_shrink_grant(cli))
1487                 osc_shrink_grant_local(cli, &body->oa);
1488
1489         /* size[REQ_REC_OFF] still sizeof (*body) */
1490         if (opc == OST_WRITE) {
1491                 if (cli->cl_checksum &&
1492                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1493                         /* store cl_cksum_type in a local variable since
1494                          * it can be changed via lprocfs */
1495                         enum cksum_types cksum_type = cli->cl_cksum_type;
1496
1497                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1498                                 body->oa.o_flags = 0;
1499
1500                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1501                                                                 cksum_type);
1502                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1503
1504                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1505                                                   requested_nob, page_count,
1506                                                   pga, OST_WRITE,
1507                                                   &body->oa.o_cksum);
1508                         if (rc < 0) {
1509                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1510                                        rc);
1511                                 GOTO(out, rc);
1512                         }
1513                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1514                                body->oa.o_cksum);
1515
1516                         /* save this in 'oa', too, for later checking */
1517                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1518                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1519                                                            cksum_type);
1520                 } else {
1521                         /* clear out the checksum flag, in case this is a
1522                          * resend but cl_checksum is no longer set. b=11238 */
1523                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1524                 }
1525                 oa->o_cksum = body->oa.o_cksum;
1526                 /* 1 RC per niobuf */
1527                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1528                                      sizeof(__u32) * niocount);
1529         } else {
1530                 if (cli->cl_checksum &&
1531                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1532                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1533                                 body->oa.o_flags = 0;
1534                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1535                                 cli->cl_cksum_type);
1536                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1537                 }
1538
1539                 /* Client cksum has been already copied to wire obdo in previous
1540                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1541                  * resent due to cksum error, this will allow Server to
1542                  * check+dump pages on its side */
1543         }
1544         ptlrpc_request_set_replen(req);
1545
1546         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1547         aa = ptlrpc_req_async_args(req);
1548         aa->aa_oa = oa;
1549         aa->aa_requested_nob = requested_nob;
1550         aa->aa_nio_count = niocount;
1551         aa->aa_page_count = page_count;
1552         aa->aa_resends = 0;
1553         aa->aa_ppga = pga;
1554         aa->aa_cli = cli;
1555         INIT_LIST_HEAD(&aa->aa_oaps);
1556
1557         *reqp = req;
1558         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1559         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1560                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1561                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1562         RETURN(0);
1563
1564  out:
1565         ptlrpc_req_finished(req);
1566         RETURN(rc);
1567 }
1568
1569 char dbgcksum_file_name[PATH_MAX];
1570
1571 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1572                                 struct brw_page **pga, __u32 server_cksum,
1573                                 __u32 client_cksum)
1574 {
1575         struct file *filp;
1576         int rc, i;
1577         unsigned int len;
1578         char *buf;
1579
1580         /* will only keep dump of pages on first error for the same range in
1581          * file/fid, not during the resends/retries. */
1582         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1583                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1584                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1585                   libcfs_debug_file_path_arr :
1586                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1587                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1590                  pga[0]->off,
1591                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1592                  client_cksum, server_cksum);
1593         filp = filp_open(dbgcksum_file_name,
1594                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1595         if (IS_ERR(filp)) {
1596                 rc = PTR_ERR(filp);
1597                 if (rc == -EEXIST)
1598                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1599                                "checksum error: rc = %d\n", dbgcksum_file_name,
1600                                rc);
1601                 else
1602                         CERROR("%s: can't open to dump pages with checksum "
1603                                "error: rc = %d\n", dbgcksum_file_name, rc);
1604                 return;
1605         }
1606
1607         for (i = 0; i < page_count; i++) {
1608                 len = pga[i]->count;
1609                 buf = kmap(pga[i]->pg);
1610                 while (len != 0) {
1611                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1612                         if (rc < 0) {
1613                                 CERROR("%s: wanted to write %u but got %d "
1614                                        "error\n", dbgcksum_file_name, len, rc);
1615                                 break;
1616                         }
1617                         len -= rc;
1618                         buf += rc;
1619                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1620                                dbgcksum_file_name, rc);
1621                 }
1622                 kunmap(pga[i]->pg);
1623         }
1624
1625         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1626         if (rc)
1627                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1628         filp_close(filp, NULL);
1629         return;
1630 }
1631
1632 static int
1633 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1634                      __u32 client_cksum, __u32 server_cksum,
1635                      struct osc_brw_async_args *aa)
1636 {
1637         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1638         enum cksum_types cksum_type;
1639         obd_dif_csum_fn *fn = NULL;
1640         int sector_size = 0;
1641         __u32 new_cksum;
1642         char *msg;
1643         int rc;
1644
1645         if (server_cksum == client_cksum) {
1646                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1647                 return 0;
1648         }
1649
1650         if (aa->aa_cli->cl_checksum_dump)
1651                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1652                                     server_cksum, client_cksum);
1653
1654         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1655                                            oa->o_flags : 0);
1656
1657         switch (cksum_type) {
1658         case OBD_CKSUM_T10IP512:
1659                 fn = obd_dif_ip_fn;
1660                 sector_size = 512;
1661                 break;
1662         case OBD_CKSUM_T10IP4K:
1663                 fn = obd_dif_ip_fn;
1664                 sector_size = 4096;
1665                 break;
1666         case OBD_CKSUM_T10CRC512:
1667                 fn = obd_dif_crc_fn;
1668                 sector_size = 512;
1669                 break;
1670         case OBD_CKSUM_T10CRC4K:
1671                 fn = obd_dif_crc_fn;
1672                 sector_size = 4096;
1673                 break;
1674         default:
1675                 break;
1676         }
1677
1678         if (fn)
1679                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1680                                              aa->aa_page_count, aa->aa_ppga,
1681                                              OST_WRITE, fn, sector_size,
1682                                              &new_cksum);
1683         else
1684                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1685                                        aa->aa_ppga, OST_WRITE, cksum_type,
1686                                        &new_cksum);
1687
1688         if (rc < 0)
1689                 msg = "failed to calculate the client write checksum";
1690         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1691                 msg = "the server did not use the checksum type specified in "
1692                       "the original request - likely a protocol problem";
1693         else if (new_cksum == server_cksum)
1694                 msg = "changed on the client after we checksummed it - "
1695                       "likely false positive due to mmap IO (bug 11742)";
1696         else if (new_cksum == client_cksum)
1697                 msg = "changed in transit before arrival at OST";
1698         else
1699                 msg = "changed in transit AND doesn't match the original - "
1700                       "likely false positive due to mmap IO (bug 11742)";
1701
1702         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1703                            DFID " object "DOSTID" extent [%llu-%llu], original "
1704                            "client csum %x (type %x), server csum %x (type %x),"
1705                            " client csum now %x\n",
1706                            obd_name, msg, libcfs_nid2str(peer->nid),
1707                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1710                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1711                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1712                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1713                            client_cksum,
1714                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1715                            server_cksum, cksum_type, new_cksum);
1716         return 1;
1717 }
1718
1719 /* Note rc enters this function as number of bytes transferred */
1720 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1721 {
1722         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1723         struct client_obd *cli = aa->aa_cli;
1724         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1725         const struct lnet_process_id *peer =
1726                 &req->rq_import->imp_connection->c_peer;
1727         struct ost_body *body;
1728         u32 client_cksum = 0;
1729         ENTRY;
1730
1731         if (rc < 0 && rc != -EDQUOT) {
1732                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1733                 RETURN(rc);
1734         }
1735
1736         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1738         if (body == NULL) {
1739                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1740                 RETURN(-EPROTO);
1741         }
1742
1743         /* set/clear over quota flag for a uid/gid/projid */
1744         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1745             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1746                 unsigned qid[LL_MAXQUOTAS] = {
1747                                          body->oa.o_uid, body->oa.o_gid,
1748                                          body->oa.o_projid };
1749                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1750                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1751                        body->oa.o_valid, body->oa.o_flags);
1752                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1753                                        body->oa.o_flags);
1754         }
1755
1756         osc_update_grant(cli, body);
1757
1758         if (rc < 0)
1759                 RETURN(rc);
1760
1761         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1762                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1763
1764         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1765                 if (rc > 0) {
1766                         CERROR("Unexpected +ve rc %d\n", rc);
1767                         RETURN(-EPROTO);
1768                 }
1769
1770                 if (req->rq_bulk != NULL &&
1771                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1772                         RETURN(-EAGAIN);
1773
1774                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1775                     check_write_checksum(&body->oa, peer, client_cksum,
1776                                          body->oa.o_cksum, aa))
1777                         RETURN(-EAGAIN);
1778
1779                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1780                                      aa->aa_page_count, aa->aa_ppga);
1781                 GOTO(out, rc);
1782         }
1783
1784         /* The rest of this function executes only for OST_READs */
1785
1786         if (req->rq_bulk == NULL) {
1787                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1788                                           RCL_SERVER);
1789                 LASSERT(rc == req->rq_status);
1790         } else {
1791                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1792                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1793         }
1794         if (rc < 0)
1795                 GOTO(out, rc = -EAGAIN);
1796
1797         if (rc > aa->aa_requested_nob) {
1798                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1799                        aa->aa_requested_nob);
1800                 RETURN(-EPROTO);
1801         }
1802
1803         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1804                 CERROR ("Unexpected rc %d (%d transferred)\n",
1805                         rc, req->rq_bulk->bd_nob_transferred);
1806                 return (-EPROTO);
1807         }
1808
1809         if (req->rq_bulk == NULL) {
1810                 /* short io */
1811                 int nob, pg_count, i = 0;
1812                 unsigned char *buf;
1813
1814                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1815                 pg_count = aa->aa_page_count;
1816                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1817                                                    rc);
1818                 nob = rc;
1819                 while (nob > 0 && pg_count > 0) {
1820                         unsigned char *ptr;
1821                         int count = aa->aa_ppga[i]->count > nob ?
1822                                     nob : aa->aa_ppga[i]->count;
1823
1824                         CDEBUG(D_CACHE, "page %p count %d\n",
1825                                aa->aa_ppga[i]->pg, count);
1826                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1827                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1828                                count);
1829                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1830
1831                         buf += count;
1832                         nob -= count;
1833                         i++;
1834                         pg_count--;
1835                 }
1836         }
1837
1838         if (rc < aa->aa_requested_nob)
1839                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1840
1841         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1842                 static int cksum_counter;
1843                 u32        server_cksum = body->oa.o_cksum;
1844                 char      *via = "";
1845                 char      *router = "";
1846                 enum cksum_types cksum_type;
1847                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1848                         body->oa.o_flags : 0;
1849
1850                 cksum_type = obd_cksum_type_unpack(o_flags);
1851                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1852                                           aa->aa_page_count, aa->aa_ppga,
1853                                           OST_READ, &client_cksum);
1854                 if (rc < 0)
1855                         GOTO(out, rc);
1856
1857                 if (req->rq_bulk != NULL &&
1858                     peer->nid != req->rq_bulk->bd_sender) {
1859                         via = " via ";
1860                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1861                 }
1862
1863                 if (server_cksum != client_cksum) {
1864                         struct ost_body *clbody;
1865                         u32 page_count = aa->aa_page_count;
1866
1867                         clbody = req_capsule_client_get(&req->rq_pill,
1868                                                         &RMF_OST_BODY);
1869                         if (cli->cl_checksum_dump)
1870                                 dump_all_bulk_pages(&clbody->oa, page_count,
1871                                                     aa->aa_ppga, server_cksum,
1872                                                     client_cksum);
1873
1874                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1875                                            "%s%s%s inode "DFID" object "DOSTID
1876                                            " extent [%llu-%llu], client %x, "
1877                                            "server %x, cksum_type %x\n",
1878                                            obd_name,
1879                                            libcfs_nid2str(peer->nid),
1880                                            via, router,
1881                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1882                                                 clbody->oa.o_parent_seq : 0ULL,
1883                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1884                                                 clbody->oa.o_parent_oid : 0,
1885                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1886                                                 clbody->oa.o_parent_ver : 0,
1887                                            POSTID(&body->oa.o_oi),
1888                                            aa->aa_ppga[0]->off,
1889                                            aa->aa_ppga[page_count-1]->off +
1890                                            aa->aa_ppga[page_count-1]->count - 1,
1891                                            client_cksum, server_cksum,
1892                                            cksum_type);
1893                         cksum_counter = 0;
1894                         aa->aa_oa->o_cksum = client_cksum;
1895                         rc = -EAGAIN;
1896                 } else {
1897                         cksum_counter++;
1898                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1899                         rc = 0;
1900                 }
1901         } else if (unlikely(client_cksum)) {
1902                 static int cksum_missed;
1903
1904                 cksum_missed++;
1905                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1906                         CERROR("Checksum %u requested from %s but not sent\n",
1907                                cksum_missed, libcfs_nid2str(peer->nid));
1908         } else {
1909                 rc = 0;
1910         }
1911 out:
1912         if (rc >= 0)
1913                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1914                                      aa->aa_oa, &body->oa);
1915
1916         RETURN(rc);
1917 }
1918
1919 static int osc_brw_redo_request(struct ptlrpc_request *request,
1920                                 struct osc_brw_async_args *aa, int rc)
1921 {
1922         struct ptlrpc_request *new_req;
1923         struct osc_brw_async_args *new_aa;
1924         struct osc_async_page *oap;
1925         ENTRY;
1926
1927         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1928                   "redo for recoverable error %d", rc);
1929
1930         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1931                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1932                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1933                                   aa->aa_ppga, &new_req, 1);
1934         if (rc)
1935                 RETURN(rc);
1936
1937         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1938                 if (oap->oap_request != NULL) {
1939                         LASSERTF(request == oap->oap_request,
1940                                  "request %p != oap_request %p\n",
1941                                  request, oap->oap_request);
1942                         if (oap->oap_interrupted) {
1943                                 ptlrpc_req_finished(new_req);
1944                                 RETURN(-EINTR);
1945                         }
1946                 }
1947         }
1948         /*
1949          * New request takes over pga and oaps from old request.
1950          * Note that copying a list_head doesn't work, need to move it...
1951          */
1952         aa->aa_resends++;
1953         new_req->rq_interpret_reply = request->rq_interpret_reply;
1954         new_req->rq_async_args = request->rq_async_args;
1955         new_req->rq_commit_cb = request->rq_commit_cb;
1956         /* cap resend delay to the current request timeout, this is similar to
1957          * what ptlrpc does (see after_reply()) */
1958         if (aa->aa_resends > new_req->rq_timeout)
1959                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1960         else
1961                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1962         new_req->rq_generation_set = 1;
1963         new_req->rq_import_generation = request->rq_import_generation;
1964
1965         new_aa = ptlrpc_req_async_args(new_req);
1966
1967         INIT_LIST_HEAD(&new_aa->aa_oaps);
1968         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1969         INIT_LIST_HEAD(&new_aa->aa_exts);
1970         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1971         new_aa->aa_resends = aa->aa_resends;
1972
1973         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1974                 if (oap->oap_request) {
1975                         ptlrpc_req_finished(oap->oap_request);
1976                         oap->oap_request = ptlrpc_request_addref(new_req);
1977                 }
1978         }
1979
1980         /* XXX: This code will run into problem if we're going to support
1981          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1982          * and wait for all of them to be finished. We should inherit request
1983          * set from old request. */
1984         ptlrpcd_add_req(new_req);
1985
1986         DEBUG_REQ(D_INFO, new_req, "new request");
1987         RETURN(0);
1988 }
1989
1990 /*
1991  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1992  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1993  * fine for our small page arrays and doesn't require allocation.  its an
1994  * insertion sort that swaps elements that are strides apart, shrinking the
1995  * stride down until its '1' and the array is sorted.
1996  */
1997 static void sort_brw_pages(struct brw_page **array, int num)
1998 {
1999         int stride, i, j;
2000         struct brw_page *tmp;
2001
2002         if (num == 1)
2003                 return;
2004         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2005                 ;
2006
2007         do {
2008                 stride /= 3;
2009                 for (i = stride ; i < num ; i++) {
2010                         tmp = array[i];
2011                         j = i;
2012                         while (j >= stride && array[j - stride]->off > tmp->off) {
2013                                 array[j] = array[j - stride];
2014                                 j -= stride;
2015                         }
2016                         array[j] = tmp;
2017                 }
2018         } while (stride > 1);
2019 }
2020
2021 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2022 {
2023         LASSERT(ppga != NULL);
2024         OBD_FREE(ppga, sizeof(*ppga) * count);
2025 }
2026
2027 static int brw_interpret(const struct lu_env *env,
2028                          struct ptlrpc_request *req, void *args, int rc)
2029 {
2030         struct osc_brw_async_args *aa = args;
2031         struct osc_extent *ext;
2032         struct osc_extent *tmp;
2033         struct client_obd *cli = aa->aa_cli;
2034         unsigned long transferred = 0;
2035
2036         ENTRY;
2037
2038         rc = osc_brw_fini_request(req, rc);
2039         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2040         /*
2041          * When server returns -EINPROGRESS, client should always retry
2042          * regardless of the number of times the bulk was resent already.
2043          */
2044         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2045                 if (req->rq_import_generation !=
2046                     req->rq_import->imp_generation) {
2047                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2048                                ""DOSTID", rc = %d.\n",
2049                                req->rq_import->imp_obd->obd_name,
2050                                POSTID(&aa->aa_oa->o_oi), rc);
2051                 } else if (rc == -EINPROGRESS ||
2052                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2053                         rc = osc_brw_redo_request(req, aa, rc);
2054                 } else {
2055                         CERROR("%s: too many resent retries for object: "
2056                                "%llu:%llu, rc = %d.\n",
2057                                req->rq_import->imp_obd->obd_name,
2058                                POSTID(&aa->aa_oa->o_oi), rc);
2059                 }
2060
2061                 if (rc == 0)
2062                         RETURN(0);
2063                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2064                         rc = -EIO;
2065         }
2066
2067         if (rc == 0) {
2068                 struct obdo *oa = aa->aa_oa;
2069                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2070                 unsigned long valid = 0;
2071                 struct cl_object *obj;
2072                 struct osc_async_page *last;
2073
2074                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2075                 obj = osc2cl(last->oap_obj);
2076
2077                 cl_object_attr_lock(obj);
2078                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2079                         attr->cat_blocks = oa->o_blocks;
2080                         valid |= CAT_BLOCKS;
2081                 }
2082                 if (oa->o_valid & OBD_MD_FLMTIME) {
2083                         attr->cat_mtime = oa->o_mtime;
2084                         valid |= CAT_MTIME;
2085                 }
2086                 if (oa->o_valid & OBD_MD_FLATIME) {
2087                         attr->cat_atime = oa->o_atime;
2088                         valid |= CAT_ATIME;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLCTIME) {
2091                         attr->cat_ctime = oa->o_ctime;
2092                         valid |= CAT_CTIME;
2093                 }
2094
2095                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2096                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2097                         loff_t last_off = last->oap_count + last->oap_obj_off +
2098                                 last->oap_page_off;
2099
2100                         /* Change file size if this is an out of quota or
2101                          * direct IO write and it extends the file size */
2102                         if (loi->loi_lvb.lvb_size < last_off) {
2103                                 attr->cat_size = last_off;
2104                                 valid |= CAT_SIZE;
2105                         }
2106                         /* Extend KMS if it's not a lockless write */
2107                         if (loi->loi_kms < last_off &&
2108                             oap2osc_page(last)->ops_srvlock == 0) {
2109                                 attr->cat_kms = last_off;
2110                                 valid |= CAT_KMS;
2111                         }
2112                 }
2113
2114                 if (valid != 0)
2115                         cl_object_attr_update(env, obj, attr, valid);
2116                 cl_object_attr_unlock(obj);
2117         }
2118         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2119
2120         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2121                 osc_inc_unstable_pages(req);
2122
2123         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2124                 list_del_init(&ext->oe_link);
2125                 osc_extent_finish(env, ext, 1,
2126                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2127         }
2128         LASSERT(list_empty(&aa->aa_exts));
2129         LASSERT(list_empty(&aa->aa_oaps));
2130
2131         transferred = (req->rq_bulk == NULL ? /* short io */
2132                        aa->aa_requested_nob :
2133                        req->rq_bulk->bd_nob_transferred);
2134
2135         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2136         ptlrpc_lprocfs_brw(req, transferred);
2137
2138         spin_lock(&cli->cl_loi_list_lock);
2139         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2140          * is called so we know whether to go to sync BRWs or wait for more
2141          * RPCs to complete */
2142         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2143                 cli->cl_w_in_flight--;
2144         else
2145                 cli->cl_r_in_flight--;
2146         osc_wake_cache_waiters(cli);
2147         spin_unlock(&cli->cl_loi_list_lock);
2148
2149         osc_io_unplug(env, cli, NULL);
2150         RETURN(rc);
2151 }
2152
2153 static void brw_commit(struct ptlrpc_request *req)
2154 {
2155         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2156          * this called via the rq_commit_cb, I need to ensure
2157          * osc_dec_unstable_pages is still called. Otherwise unstable
2158          * pages may be leaked. */
2159         spin_lock(&req->rq_lock);
2160         if (likely(req->rq_unstable)) {
2161                 req->rq_unstable = 0;
2162                 spin_unlock(&req->rq_lock);
2163
2164                 osc_dec_unstable_pages(req);
2165         } else {
2166                 req->rq_committed = 1;
2167                 spin_unlock(&req->rq_lock);
2168         }
2169 }
2170
2171 /**
2172  * Build an RPC by the list of extent @ext_list. The caller must ensure
2173  * that the total pages in this list are NOT over max pages per RPC.
2174  * Extents in the list must be in OES_RPC state.
2175  */
2176 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2177                   struct list_head *ext_list, int cmd)
2178 {
2179         struct ptlrpc_request           *req = NULL;
2180         struct osc_extent               *ext;
2181         struct brw_page                 **pga = NULL;
2182         struct osc_brw_async_args       *aa = NULL;
2183         struct obdo                     *oa = NULL;
2184         struct osc_async_page           *oap;
2185         struct osc_object               *obj = NULL;
2186         struct cl_req_attr              *crattr = NULL;
2187         loff_t                          starting_offset = OBD_OBJECT_EOF;
2188         loff_t                          ending_offset = 0;
2189         int                             mpflag = 0;
2190         int                             mem_tight = 0;
2191         int                             page_count = 0;
2192         bool                            soft_sync = false;
2193         bool                            interrupted = false;
2194         bool                            ndelay = false;
2195         int                             i;
2196         int                             grant = 0;
2197         int                             rc;
2198         __u32                           layout_version = 0;
2199         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2200         struct ost_body                 *body;
2201         ENTRY;
2202         LASSERT(!list_empty(ext_list));
2203
2204         /* add pages into rpc_list to build BRW rpc */
2205         list_for_each_entry(ext, ext_list, oe_link) {
2206                 LASSERT(ext->oe_state == OES_RPC);
2207                 mem_tight |= ext->oe_memalloc;
2208                 grant += ext->oe_grants;
2209                 page_count += ext->oe_nr_pages;
2210                 layout_version = MAX(layout_version, ext->oe_layout_version);
2211                 if (obj == NULL)
2212                         obj = ext->oe_obj;
2213         }
2214
2215         soft_sync = osc_over_unstable_soft_limit(cli);
2216         if (mem_tight)
2217                 mpflag = cfs_memory_pressure_get_and_set();
2218
2219         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2220         if (pga == NULL)
2221                 GOTO(out, rc = -ENOMEM);
2222
2223         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2224         if (oa == NULL)
2225                 GOTO(out, rc = -ENOMEM);
2226
2227         i = 0;
2228         list_for_each_entry(ext, ext_list, oe_link) {
2229                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2230                         if (mem_tight)
2231                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2232                         if (soft_sync)
2233                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2234                         pga[i] = &oap->oap_brw_page;
2235                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2236                         i++;
2237
2238                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2239                         if (starting_offset == OBD_OBJECT_EOF ||
2240                             starting_offset > oap->oap_obj_off)
2241                                 starting_offset = oap->oap_obj_off;
2242                         else
2243                                 LASSERT(oap->oap_page_off == 0);
2244                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2245                                 ending_offset = oap->oap_obj_off +
2246                                                 oap->oap_count;
2247                         else
2248                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2249                                         PAGE_SIZE);
2250                         if (oap->oap_interrupted)
2251                                 interrupted = true;
2252                 }
2253                 if (ext->oe_ndelay)
2254                         ndelay = true;
2255         }
2256
2257         /* first page in the list */
2258         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2259
2260         crattr = &osc_env_info(env)->oti_req_attr;
2261         memset(crattr, 0, sizeof(*crattr));
2262         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2263         crattr->cra_flags = ~0ULL;
2264         crattr->cra_page = oap2cl_page(oap);
2265         crattr->cra_oa = oa;
2266         cl_req_attr_set(env, osc2cl(obj), crattr);
2267
2268         if (cmd == OBD_BRW_WRITE) {
2269                 oa->o_grant_used = grant;
2270                 if (layout_version > 0) {
2271                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2272                                PFID(&oa->o_oi.oi_fid), layout_version);
2273
2274                         oa->o_layout_version = layout_version;
2275                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2276                 }
2277         }
2278
2279         sort_brw_pages(pga, page_count);
2280         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2281         if (rc != 0) {
2282                 CERROR("prep_req failed: %d\n", rc);
2283                 GOTO(out, rc);
2284         }
2285
2286         req->rq_commit_cb = brw_commit;
2287         req->rq_interpret_reply = brw_interpret;
2288         req->rq_memalloc = mem_tight != 0;
2289         oap->oap_request = ptlrpc_request_addref(req);
2290         if (interrupted && !req->rq_intr)
2291                 ptlrpc_mark_interrupted(req);
2292         if (ndelay) {
2293                 req->rq_no_resend = req->rq_no_delay = 1;
2294                 /* probably set a shorter timeout value.
2295                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2296                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2297         }
2298
2299         /* Need to update the timestamps after the request is built in case
2300          * we race with setattr (locally or in queue at OST).  If OST gets
2301          * later setattr before earlier BRW (as determined by the request xid),
2302          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2303          * way to do this in a single call.  bug 10150 */
2304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305         crattr->cra_oa = &body->oa;
2306         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2307         cl_req_attr_set(env, osc2cl(obj), crattr);
2308         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2309
2310         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2311         aa = ptlrpc_req_async_args(req);
2312         INIT_LIST_HEAD(&aa->aa_oaps);
2313         list_splice_init(&rpc_list, &aa->aa_oaps);
2314         INIT_LIST_HEAD(&aa->aa_exts);
2315         list_splice_init(ext_list, &aa->aa_exts);
2316
2317         spin_lock(&cli->cl_loi_list_lock);
2318         starting_offset >>= PAGE_SHIFT;
2319         if (cmd == OBD_BRW_READ) {
2320                 cli->cl_r_in_flight++;
2321                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2322                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2323                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2324                                       starting_offset + 1);
2325         } else {
2326                 cli->cl_w_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2330                                       starting_offset + 1);
2331         }
2332         spin_unlock(&cli->cl_loi_list_lock);
2333
2334         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2335                   page_count, aa, cli->cl_r_in_flight,
2336                   cli->cl_w_in_flight);
2337         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2338
2339         ptlrpcd_add_req(req);
2340         rc = 0;
2341         EXIT;
2342
2343 out:
2344         if (mem_tight != 0)
2345                 cfs_memory_pressure_restore(mpflag);
2346
2347         if (rc != 0) {
2348                 LASSERT(req == NULL);
2349
2350                 if (oa)
2351                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2352                 if (pga)
2353                         OBD_FREE(pga, sizeof(*pga) * page_count);
2354                 /* this should happen rarely and is pretty bad, it makes the
2355                  * pending list not follow the dirty order */
2356                 while (!list_empty(ext_list)) {
2357                         ext = list_entry(ext_list->next, struct osc_extent,
2358                                          oe_link);
2359                         list_del_init(&ext->oe_link);
2360                         osc_extent_finish(env, ext, 0, rc);
2361                 }
2362         }
2363         RETURN(rc);
2364 }
2365
2366 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2367 {
2368         int set = 0;
2369
2370         LASSERT(lock != NULL);
2371
2372         lock_res_and_lock(lock);
2373
2374         if (lock->l_ast_data == NULL)
2375                 lock->l_ast_data = data;
2376         if (lock->l_ast_data == data)
2377                 set = 1;
2378
2379         unlock_res_and_lock(lock);
2380
2381         return set;
2382 }
2383
2384 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2385                      void *cookie, struct lustre_handle *lockh,
2386                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2387                      int errcode)
2388 {
2389         bool intent = *flags & LDLM_FL_HAS_INTENT;
2390         int rc;
2391         ENTRY;
2392
2393         /* The request was created before ldlm_cli_enqueue call. */
2394         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2395                 struct ldlm_reply *rep;
2396
2397                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2398                 LASSERT(rep != NULL);
2399
2400                 rep->lock_policy_res1 =
2401                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2402                 if (rep->lock_policy_res1)
2403                         errcode = rep->lock_policy_res1;
2404                 if (!speculative)
2405                         *flags |= LDLM_FL_LVB_READY;
2406         } else if (errcode == ELDLM_OK) {
2407                 *flags |= LDLM_FL_LVB_READY;
2408         }
2409
2410         /* Call the update callback. */
2411         rc = (*upcall)(cookie, lockh, errcode);
2412
2413         /* release the reference taken in ldlm_cli_enqueue() */
2414         if (errcode == ELDLM_LOCK_MATCHED)
2415                 errcode = ELDLM_OK;
2416         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2417                 ldlm_lock_decref(lockh, mode);
2418
2419         RETURN(rc);
2420 }
2421
2422 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2423                           void *args, int rc)
2424 {
2425         struct osc_enqueue_args *aa = args;
2426         struct ldlm_lock *lock;
2427         struct lustre_handle *lockh = &aa->oa_lockh;
2428         enum ldlm_mode mode = aa->oa_mode;
2429         struct ost_lvb *lvb = aa->oa_lvb;
2430         __u32 lvb_len = sizeof(*lvb);
2431         __u64 flags = 0;
2432
2433         ENTRY;
2434
2435         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2436          * be valid. */
2437         lock = ldlm_handle2lock(lockh);
2438         LASSERTF(lock != NULL,
2439                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2440                  lockh->cookie, req, aa);
2441
2442         /* Take an additional reference so that a blocking AST that
2443          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2444          * to arrive after an upcall has been executed by
2445          * osc_enqueue_fini(). */
2446         ldlm_lock_addref(lockh, mode);
2447
2448         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2449         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2450
2451         /* Let CP AST to grant the lock first. */
2452         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2453
2454         if (aa->oa_speculative) {
2455                 LASSERT(aa->oa_lvb == NULL);
2456                 LASSERT(aa->oa_flags == NULL);
2457                 aa->oa_flags = &flags;
2458         }
2459
2460         /* Complete obtaining the lock procedure. */
2461         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2462                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2463                                    lockh, rc);
2464         /* Complete osc stuff. */
2465         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2466                               aa->oa_flags, aa->oa_speculative, rc);
2467
2468         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2469
2470         ldlm_lock_decref(lockh, mode);
2471         LDLM_LOCK_PUT(lock);
2472         RETURN(rc);
2473 }
2474
2475 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2476
2477 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2478  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2479  * other synchronous requests, however keeping some locks and trying to obtain
2480  * others may take a considerable amount of time in a case of ost failure; and
2481  * when other sync requests do not get released lock from a client, the client
2482  * is evicted from the cluster -- such scenarious make the life difficult, so
2483  * release locks just after they are obtained. */
2484 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2485                      __u64 *flags, union ldlm_policy_data *policy,
2486                      struct ost_lvb *lvb, int kms_valid,
2487                      osc_enqueue_upcall_f upcall, void *cookie,
2488                      struct ldlm_enqueue_info *einfo,
2489                      struct ptlrpc_request_set *rqset, int async,
2490                      bool speculative)
2491 {
2492         struct obd_device *obd = exp->exp_obd;
2493         struct lustre_handle lockh = { 0 };
2494         struct ptlrpc_request *req = NULL;
2495         int intent = *flags & LDLM_FL_HAS_INTENT;
2496         __u64 match_flags = *flags;
2497         enum ldlm_mode mode;
2498         int rc;
2499         ENTRY;
2500
2501         /* Filesystem lock extents are extended to page boundaries so that
2502          * dealing with the page cache is a little smoother.  */
2503         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2504         policy->l_extent.end |= ~PAGE_MASK;
2505
2506         /*
2507          * kms is not valid when either object is completely fresh (so that no
2508          * locks are cached), or object was evicted. In the latter case cached
2509          * lock cannot be used, because it would prime inode state with
2510          * potentially stale LVB.
2511          */
2512         if (!kms_valid)
2513                 goto no_match;
2514
2515         /* Next, search for already existing extent locks that will cover us */
2516         /* If we're trying to read, we also search for an existing PW lock.  The
2517          * VFS and page cache already protect us locally, so lots of readers/
2518          * writers can share a single PW lock.
2519          *
2520          * There are problems with conversion deadlocks, so instead of
2521          * converting a read lock to a write lock, we'll just enqueue a new
2522          * one.
2523          *
2524          * At some point we should cancel the read lock instead of making them
2525          * send us a blocking callback, but there are problems with canceling
2526          * locks out from other users right now, too. */
2527         mode = einfo->ei_mode;
2528         if (einfo->ei_mode == LCK_PR)
2529                 mode |= LCK_PW;
2530         /* Normal lock requests must wait for the LVB to be ready before
2531          * matching a lock; speculative lock requests do not need to,
2532          * because they will not actually use the lock. */
2533         if (!speculative)
2534                 match_flags |= LDLM_FL_LVB_READY;
2535         if (intent != 0)
2536                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2537         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2538                                einfo->ei_type, policy, mode, &lockh, 0);
2539         if (mode) {
2540                 struct ldlm_lock *matched;
2541
2542                 if (*flags & LDLM_FL_TEST_LOCK)
2543                         RETURN(ELDLM_OK);
2544
2545                 matched = ldlm_handle2lock(&lockh);
2546                 if (speculative) {
2547                         /* This DLM lock request is speculative, and does not
2548                          * have an associated IO request. Therefore if there
2549                          * is already a DLM lock, it wll just inform the
2550                          * caller to cancel the request for this stripe.*/
2551                         lock_res_and_lock(matched);
2552                         if (ldlm_extent_equal(&policy->l_extent,
2553                             &matched->l_policy_data.l_extent))
2554                                 rc = -EEXIST;
2555                         else
2556                                 rc = -ECANCELED;
2557                         unlock_res_and_lock(matched);
2558
2559                         ldlm_lock_decref(&lockh, mode);
2560                         LDLM_LOCK_PUT(matched);
2561                         RETURN(rc);
2562                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2563                         *flags |= LDLM_FL_LVB_READY;
2564
2565                         /* We already have a lock, and it's referenced. */
2566                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2567
2568                         ldlm_lock_decref(&lockh, mode);
2569                         LDLM_LOCK_PUT(matched);
2570                         RETURN(ELDLM_OK);
2571                 } else {
2572                         ldlm_lock_decref(&lockh, mode);
2573                         LDLM_LOCK_PUT(matched);
2574                 }
2575         }
2576
2577 no_match:
2578         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2579                 RETURN(-ENOLCK);
2580
2581         if (intent) {
2582                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2583                                            &RQF_LDLM_ENQUEUE_LVB);
2584                 if (req == NULL)
2585                         RETURN(-ENOMEM);
2586
2587                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2588                 if (rc) {
2589                         ptlrpc_request_free(req);
2590                         RETURN(rc);
2591                 }
2592
2593                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2594                                      sizeof *lvb);
2595                 ptlrpc_request_set_replen(req);
2596         }
2597
2598         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2599         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2600
2601         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2602                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2603         if (async) {
2604                 if (!rc) {
2605                         struct osc_enqueue_args *aa;
2606                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2607                         aa = ptlrpc_req_async_args(req);
2608                         aa->oa_exp         = exp;
2609                         aa->oa_mode        = einfo->ei_mode;
2610                         aa->oa_type        = einfo->ei_type;
2611                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2612                         aa->oa_upcall      = upcall;
2613                         aa->oa_cookie      = cookie;
2614                         aa->oa_speculative = speculative;
2615                         if (!speculative) {
2616                                 aa->oa_flags  = flags;
2617                                 aa->oa_lvb    = lvb;
2618                         } else {
2619                                 /* speculative locks are essentially to enqueue
2620                                  * a DLM lock  in advance, so we don't care
2621                                  * about the result of the enqueue. */
2622                                 aa->oa_lvb    = NULL;
2623                                 aa->oa_flags  = NULL;
2624                         }
2625
2626                         req->rq_interpret_reply = osc_enqueue_interpret;
2627                         if (rqset == PTLRPCD_SET)
2628                                 ptlrpcd_add_req(req);
2629                         else
2630                                 ptlrpc_set_add_req(rqset, req);
2631                 } else if (intent) {
2632                         ptlrpc_req_finished(req);
2633                 }
2634                 RETURN(rc);
2635         }
2636
2637         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2638                               flags, speculative, rc);
2639         if (intent)
2640                 ptlrpc_req_finished(req);
2641
2642         RETURN(rc);
2643 }
2644
2645 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2646                    enum ldlm_type type, union ldlm_policy_data *policy,
2647                    enum ldlm_mode mode, __u64 *flags, void *data,
2648                    struct lustre_handle *lockh, int unref)
2649 {
2650         struct obd_device *obd = exp->exp_obd;
2651         __u64 lflags = *flags;
2652         enum ldlm_mode rc;
2653         ENTRY;
2654
2655         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2656                 RETURN(-EIO);
2657
2658         /* Filesystem lock extents are extended to page boundaries so that
2659          * dealing with the page cache is a little smoother */
2660         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2661         policy->l_extent.end |= ~PAGE_MASK;
2662
2663         /* Next, search for already existing extent locks that will cover us */
2664         /* If we're trying to read, we also search for an existing PW lock.  The
2665          * VFS and page cache already protect us locally, so lots of readers/
2666          * writers can share a single PW lock. */
2667         rc = mode;
2668         if (mode == LCK_PR)
2669                 rc |= LCK_PW;
2670         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2671                              res_id, type, policy, rc, lockh, unref);
2672         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2673                 RETURN(rc);
2674
2675         if (data != NULL) {
2676                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2677
2678                 LASSERT(lock != NULL);
2679                 if (!osc_set_lock_data(lock, data)) {
2680                         ldlm_lock_decref(lockh, rc);
2681                         rc = 0;
2682                 }
2683                 LDLM_LOCK_PUT(lock);
2684         }
2685         RETURN(rc);
2686 }
2687
2688 static int osc_statfs_interpret(const struct lu_env *env,
2689                                 struct ptlrpc_request *req, void *args, int rc)
2690 {
2691         struct osc_async_args *aa = args;
2692         struct obd_statfs *msfs;
2693
2694         ENTRY;
2695         if (rc == -EBADR)
2696                 /*
2697                  * The request has in fact never been sent due to issues at
2698                  * a higher level (LOV).  Exit immediately since the caller
2699                  * is aware of the problem and takes care of the clean up.
2700                  */
2701                 RETURN(rc);
2702
2703         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2704             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2705                 GOTO(out, rc = 0);
2706
2707         if (rc != 0)
2708                 GOTO(out, rc);
2709
2710         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2711         if (msfs == NULL)
2712                 GOTO(out, rc = -EPROTO);
2713
2714         *aa->aa_oi->oi_osfs = *msfs;
2715 out:
2716         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2717
2718         RETURN(rc);
2719 }
2720
2721 static int osc_statfs_async(struct obd_export *exp,
2722                             struct obd_info *oinfo, time64_t max_age,
2723                             struct ptlrpc_request_set *rqset)
2724 {
2725         struct obd_device     *obd = class_exp2obd(exp);
2726         struct ptlrpc_request *req;
2727         struct osc_async_args *aa;
2728         int rc;
2729         ENTRY;
2730
2731         /* We could possibly pass max_age in the request (as an absolute
2732          * timestamp or a "seconds.usec ago") so the target can avoid doing
2733          * extra calls into the filesystem if that isn't necessary (e.g.
2734          * during mount that would help a bit).  Having relative timestamps
2735          * is not so great if request processing is slow, while absolute
2736          * timestamps are not ideal because they need time synchronization. */
2737         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2738         if (req == NULL)
2739                 RETURN(-ENOMEM);
2740
2741         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2742         if (rc) {
2743                 ptlrpc_request_free(req);
2744                 RETURN(rc);
2745         }
2746         ptlrpc_request_set_replen(req);
2747         req->rq_request_portal = OST_CREATE_PORTAL;
2748         ptlrpc_at_set_req_timeout(req);
2749
2750         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2751                 /* procfs requests not want stat in wait for avoid deadlock */
2752                 req->rq_no_resend = 1;
2753                 req->rq_no_delay = 1;
2754         }
2755
2756         req->rq_interpret_reply = osc_statfs_interpret;
2757         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2758         aa = ptlrpc_req_async_args(req);
2759         aa->aa_oi = oinfo;
2760
2761         ptlrpc_set_add_req(rqset, req);
2762         RETURN(0);
2763 }
2764
2765 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2766                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2767 {
2768         struct obd_device     *obd = class_exp2obd(exp);
2769         struct obd_statfs     *msfs;
2770         struct ptlrpc_request *req;
2771         struct obd_import     *imp = NULL;
2772         int rc;
2773         ENTRY;
2774
2775
2776         /*Since the request might also come from lprocfs, so we need
2777          *sync this with client_disconnect_export Bug15684*/
2778         down_read(&obd->u.cli.cl_sem);
2779         if (obd->u.cli.cl_import)
2780                 imp = class_import_get(obd->u.cli.cl_import);
2781         up_read(&obd->u.cli.cl_sem);
2782         if (!imp)
2783                 RETURN(-ENODEV);
2784
2785         /* We could possibly pass max_age in the request (as an absolute
2786          * timestamp or a "seconds.usec ago") so the target can avoid doing
2787          * extra calls into the filesystem if that isn't necessary (e.g.
2788          * during mount that would help a bit).  Having relative timestamps
2789          * is not so great if request processing is slow, while absolute
2790          * timestamps are not ideal because they need time synchronization. */
2791         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2792
2793         class_import_put(imp);
2794
2795         if (req == NULL)
2796                 RETURN(-ENOMEM);
2797
2798         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2799         if (rc) {
2800                 ptlrpc_request_free(req);
2801                 RETURN(rc);
2802         }
2803         ptlrpc_request_set_replen(req);
2804         req->rq_request_portal = OST_CREATE_PORTAL;
2805         ptlrpc_at_set_req_timeout(req);
2806
2807         if (flags & OBD_STATFS_NODELAY) {
2808                 /* procfs requests not want stat in wait for avoid deadlock */
2809                 req->rq_no_resend = 1;
2810                 req->rq_no_delay = 1;
2811         }
2812
2813         rc = ptlrpc_queue_wait(req);
2814         if (rc)
2815                 GOTO(out, rc);
2816
2817         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2818         if (msfs == NULL)
2819                 GOTO(out, rc = -EPROTO);
2820
2821         *osfs = *msfs;
2822
2823         EXIT;
2824 out:
2825         ptlrpc_req_finished(req);
2826         return rc;
2827 }
2828
2829 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2830                          void *karg, void __user *uarg)
2831 {
2832         struct obd_device *obd = exp->exp_obd;
2833         struct obd_ioctl_data *data = karg;
2834         int err = 0;
2835         ENTRY;
2836
2837         if (!try_module_get(THIS_MODULE)) {
2838                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2839                        module_name(THIS_MODULE));
2840                 return -EINVAL;
2841         }
2842         switch (cmd) {
2843         case OBD_IOC_CLIENT_RECOVER:
2844                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2845                                             data->ioc_inlbuf1, 0);
2846                 if (err > 0)
2847                         err = 0;
2848                 GOTO(out, err);
2849         case IOC_OSC_SET_ACTIVE:
2850                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2851                                                data->ioc_offset);
2852                 GOTO(out, err);
2853         case OBD_IOC_PING_TARGET:
2854                 err = ptlrpc_obd_ping(obd);
2855                 GOTO(out, err);
2856         default:
2857                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2858                        cmd, current_comm());
2859                 GOTO(out, err = -ENOTTY);
2860         }
2861 out:
2862         module_put(THIS_MODULE);
2863         return err;
2864 }
2865
2866 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2867                        u32 keylen, void *key, u32 vallen, void *val,
2868                        struct ptlrpc_request_set *set)
2869 {
2870         struct ptlrpc_request *req;
2871         struct obd_device     *obd = exp->exp_obd;
2872         struct obd_import     *imp = class_exp2cliimp(exp);
2873         char                  *tmp;
2874         int                    rc;
2875         ENTRY;
2876
2877         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2878
2879         if (KEY_IS(KEY_CHECKSUM)) {
2880                 if (vallen != sizeof(int))
2881                         RETURN(-EINVAL);
2882                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2883                 RETURN(0);
2884         }
2885
2886         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2887                 sptlrpc_conf_client_adapt(obd);
2888                 RETURN(0);
2889         }
2890
2891         if (KEY_IS(KEY_FLUSH_CTX)) {
2892                 sptlrpc_import_flush_my_ctx(imp);
2893                 RETURN(0);
2894         }
2895
2896         if (KEY_IS(KEY_CACHE_SET)) {
2897                 struct client_obd *cli = &obd->u.cli;
2898
2899                 LASSERT(cli->cl_cache == NULL); /* only once */
2900                 cli->cl_cache = (struct cl_client_cache *)val;
2901                 cl_cache_incref(cli->cl_cache);
2902                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2903
2904                 /* add this osc into entity list */
2905                 LASSERT(list_empty(&cli->cl_lru_osc));
2906                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2907                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2908                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2909
2910                 RETURN(0);
2911         }
2912
2913         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2914                 struct client_obd *cli = &obd->u.cli;
2915                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2916                 long target = *(long *)val;
2917
2918                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2919                 *(long *)val -= nr;
2920                 RETURN(0);
2921         }
2922
2923         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2924                 RETURN(-EINVAL);
2925
2926         /* We pass all other commands directly to OST. Since nobody calls osc
2927            methods directly and everybody is supposed to go through LOV, we
2928            assume lov checked invalid values for us.
2929            The only recognised values so far are evict_by_nid and mds_conn.
2930            Even if something bad goes through, we'd get a -EINVAL from OST
2931            anyway. */
2932
2933         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2934                                                 &RQF_OST_SET_GRANT_INFO :
2935                                                 &RQF_OBD_SET_INFO);
2936         if (req == NULL)
2937                 RETURN(-ENOMEM);
2938
2939         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2940                              RCL_CLIENT, keylen);
2941         if (!KEY_IS(KEY_GRANT_SHRINK))
2942                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2943                                      RCL_CLIENT, vallen);
2944         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2945         if (rc) {
2946                 ptlrpc_request_free(req);
2947                 RETURN(rc);
2948         }
2949
2950         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2951         memcpy(tmp, key, keylen);
2952         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2953                                                         &RMF_OST_BODY :
2954                                                         &RMF_SETINFO_VAL);
2955         memcpy(tmp, val, vallen);
2956
2957         if (KEY_IS(KEY_GRANT_SHRINK)) {
2958                 struct osc_grant_args *aa;
2959                 struct obdo *oa;
2960
2961                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2962                 aa = ptlrpc_req_async_args(req);
2963                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2964                 if (!oa) {
2965                         ptlrpc_req_finished(req);
2966                         RETURN(-ENOMEM);
2967                 }
2968                 *oa = ((struct ost_body *)val)->oa;
2969                 aa->aa_oa = oa;
2970                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2971         }
2972
2973         ptlrpc_request_set_replen(req);
2974         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2975                 LASSERT(set != NULL);
2976                 ptlrpc_set_add_req(set, req);
2977                 ptlrpc_check_set(NULL, set);
2978         } else {
2979                 ptlrpcd_add_req(req);
2980         }
2981
2982         RETURN(0);
2983 }
2984 EXPORT_SYMBOL(osc_set_info_async);
2985
2986 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2987                   struct obd_device *obd, struct obd_uuid *cluuid,
2988                   struct obd_connect_data *data, void *localdata)
2989 {
2990         struct client_obd *cli = &obd->u.cli;
2991
2992         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2993                 long lost_grant;
2994                 long grant;
2995
2996                 spin_lock(&cli->cl_loi_list_lock);
2997                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2998                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2999                         grant += cli->cl_dirty_grant;
3000                 else
3001                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3002                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3003                 lost_grant = cli->cl_lost_grant;
3004                 cli->cl_lost_grant = 0;
3005                 spin_unlock(&cli->cl_loi_list_lock);
3006
3007                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3008                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3009                        data->ocd_version, data->ocd_grant, lost_grant);
3010         }
3011
3012         RETURN(0);
3013 }
3014 EXPORT_SYMBOL(osc_reconnect);
3015
3016 int osc_disconnect(struct obd_export *exp)
3017 {
3018         struct obd_device *obd = class_exp2obd(exp);
3019         int rc;
3020
3021         rc = client_disconnect_export(exp);
3022         /**
3023          * Initially we put del_shrink_grant before disconnect_export, but it
3024          * causes the following problem if setup (connect) and cleanup
3025          * (disconnect) are tangled together.
3026          *      connect p1                     disconnect p2
3027          *   ptlrpc_connect_import
3028          *     ...............               class_manual_cleanup
3029          *                                     osc_disconnect
3030          *                                     del_shrink_grant
3031          *   ptlrpc_connect_interrupt
3032          *     osc_init_grant
3033          *   add this client to shrink list
3034          *                                      cleanup_osc
3035          * Bang! grant shrink thread trigger the shrink. BUG18662
3036          */
3037         osc_del_grant_list(&obd->u.cli);
3038         return rc;
3039 }
3040 EXPORT_SYMBOL(osc_disconnect);
3041
3042 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3043                                  struct hlist_node *hnode, void *arg)
3044 {
3045         struct lu_env *env = arg;
3046         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3047         struct ldlm_lock *lock;
3048         struct osc_object *osc = NULL;
3049         ENTRY;
3050
3051         lock_res(res);
3052         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3053                 if (lock->l_ast_data != NULL && osc == NULL) {
3054                         osc = lock->l_ast_data;
3055                         cl_object_get(osc2cl(osc));
3056                 }
3057
3058                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3059                  * by the 2nd round of ldlm_namespace_clean() call in
3060                  * osc_import_event(). */
3061                 ldlm_clear_cleaned(lock);
3062         }
3063         unlock_res(res);
3064
3065         if (osc != NULL) {
3066                 osc_object_invalidate(env, osc);
3067                 cl_object_put(env, osc2cl(osc));
3068         }
3069
3070         RETURN(0);
3071 }
3072 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3073
3074 static int osc_import_event(struct obd_device *obd,
3075                             struct obd_import *imp,
3076                             enum obd_import_event event)
3077 {
3078         struct client_obd *cli;
3079         int rc = 0;
3080
3081         ENTRY;
3082         LASSERT(imp->imp_obd == obd);
3083
3084         switch (event) {
3085         case IMP_EVENT_DISCON: {
3086                 cli = &obd->u.cli;
3087                 spin_lock(&cli->cl_loi_list_lock);
3088                 cli->cl_avail_grant = 0;
3089                 cli->cl_lost_grant = 0;
3090                 spin_unlock(&cli->cl_loi_list_lock);
3091                 break;
3092         }
3093         case IMP_EVENT_INACTIVE: {
3094                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3095                 break;
3096         }
3097         case IMP_EVENT_INVALIDATE: {
3098                 struct ldlm_namespace *ns = obd->obd_namespace;
3099                 struct lu_env         *env;
3100                 __u16                  refcheck;
3101
3102                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3103
3104                 env = cl_env_get(&refcheck);
3105                 if (!IS_ERR(env)) {
3106                         osc_io_unplug(env, &obd->u.cli, NULL);
3107
3108                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3109                                                  osc_ldlm_resource_invalidate,
3110                                                  env, 0);
3111                         cl_env_put(env, &refcheck);
3112
3113                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3114                 } else
3115                         rc = PTR_ERR(env);
3116                 break;
3117         }
3118         case IMP_EVENT_ACTIVE: {
3119                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3120                 break;
3121         }
3122         case IMP_EVENT_OCD: {
3123                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3124
3125                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3126                         osc_init_grant(&obd->u.cli, ocd);
3127
3128                 /* See bug 7198 */
3129                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3130                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3131
3132                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3133                 break;
3134         }
3135         case IMP_EVENT_DEACTIVATE: {
3136                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3137                 break;
3138         }
3139         case IMP_EVENT_ACTIVATE: {
3140                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3141                 break;
3142         }
3143         default:
3144                 CERROR("Unknown import event %d\n", event);
3145                 LBUG();
3146         }
3147         RETURN(rc);
3148 }
3149
3150 /**
3151  * Determine whether the lock can be canceled before replaying the lock
3152  * during recovery, see bug16774 for detailed information.
3153  *
3154  * \retval zero the lock can't be canceled
3155  * \retval other ok to cancel
3156  */
3157 static int osc_cancel_weight(struct ldlm_lock *lock)
3158 {
3159         /*
3160          * Cancel all unused and granted extent lock.
3161          */
3162         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3163             ldlm_is_granted(lock) &&
3164             osc_ldlm_weigh_ast(lock) == 0)
3165                 RETURN(1);
3166
3167         RETURN(0);
3168 }
3169
3170 static int brw_queue_work(const struct lu_env *env, void *data)
3171 {
3172         struct client_obd *cli = data;
3173
3174         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3175
3176         osc_io_unplug(env, cli, NULL);
3177         RETURN(0);
3178 }
3179
3180 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3181 {
3182         struct client_obd *cli = &obd->u.cli;
3183         void *handler;
3184         int rc;
3185
3186         ENTRY;
3187
3188         rc = ptlrpcd_addref();
3189         if (rc)
3190                 RETURN(rc);
3191
3192         rc = client_obd_setup(obd, lcfg);
3193         if (rc)
3194                 GOTO(out_ptlrpcd, rc);
3195
3196
3197         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3198         if (IS_ERR(handler))
3199                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3200         cli->cl_writeback_work = handler;
3201
3202         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3203         if (IS_ERR(handler))
3204                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205         cli->cl_lru_work = handler;
3206
3207         rc = osc_quota_setup(obd);
3208         if (rc)
3209                 GOTO(out_ptlrpcd_work, rc);
3210
3211         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3212         osc_update_next_shrink(cli);
3213
3214         RETURN(rc);
3215
3216 out_ptlrpcd_work:
3217         if (cli->cl_writeback_work != NULL) {
3218                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3219                 cli->cl_writeback_work = NULL;
3220         }
3221         if (cli->cl_lru_work != NULL) {
3222                 ptlrpcd_destroy_work(cli->cl_lru_work);
3223                 cli->cl_lru_work = NULL;
3224         }
3225         client_obd_cleanup(obd);
3226 out_ptlrpcd:
3227         ptlrpcd_decref();
3228         RETURN(rc);
3229 }
3230 EXPORT_SYMBOL(osc_setup_common);
3231
3232 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3233 {
3234         struct client_obd *cli = &obd->u.cli;
3235         int                adding;
3236         int                added;
3237         int                req_count;
3238         int                rc;
3239
3240         ENTRY;
3241
3242         rc = osc_setup_common(obd, lcfg);
3243         if (rc < 0)
3244                 RETURN(rc);
3245
3246         rc = osc_tunables_init(obd);
3247         if (rc)
3248                 RETURN(rc);
3249
3250         /*
3251          * We try to control the total number of requests with a upper limit
3252          * osc_reqpool_maxreqcount. There might be some race which will cause
3253          * over-limit allocation, but it is fine.
3254          */
3255         req_count = atomic_read(&osc_pool_req_count);
3256         if (req_count < osc_reqpool_maxreqcount) {
3257                 adding = cli->cl_max_rpcs_in_flight + 2;
3258                 if (req_count + adding > osc_reqpool_maxreqcount)
3259                         adding = osc_reqpool_maxreqcount - req_count;
3260
3261                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3262                 atomic_add(added, &osc_pool_req_count);
3263         }
3264
3265         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3266
3267         spin_lock(&osc_shrink_lock);
3268         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3269         spin_unlock(&osc_shrink_lock);
3270         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3271         cli->cl_import->imp_idle_debug = D_HA;
3272
3273         RETURN(0);
3274 }
3275
3276 int osc_precleanup_common(struct obd_device *obd)
3277 {
3278         struct client_obd *cli = &obd->u.cli;
3279         ENTRY;
3280
3281         /* LU-464
3282          * for echo client, export may be on zombie list, wait for
3283          * zombie thread to cull it, because cli.cl_import will be
3284          * cleared in client_disconnect_export():
3285          *   class_export_destroy() -> obd_cleanup() ->
3286          *   echo_device_free() -> echo_client_cleanup() ->
3287          *   obd_disconnect() -> osc_disconnect() ->
3288          *   client_disconnect_export()
3289          */
3290         obd_zombie_barrier();
3291         if (cli->cl_writeback_work) {
3292                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3293                 cli->cl_writeback_work = NULL;
3294         }
3295
3296         if (cli->cl_lru_work) {
3297                 ptlrpcd_destroy_work(cli->cl_lru_work);
3298                 cli->cl_lru_work = NULL;
3299         }
3300
3301         obd_cleanup_client_import(obd);
3302         RETURN(0);
3303 }
3304 EXPORT_SYMBOL(osc_precleanup_common);
3305
3306 static int osc_precleanup(struct obd_device *obd)
3307 {
3308         ENTRY;
3309
3310         osc_precleanup_common(obd);
3311
3312         ptlrpc_lprocfs_unregister_obd(obd);
3313         RETURN(0);
3314 }
3315
3316 int osc_cleanup_common(struct obd_device *obd)
3317 {
3318         struct client_obd *cli = &obd->u.cli;
3319         int rc;
3320
3321         ENTRY;
3322
3323         spin_lock(&osc_shrink_lock);
3324         list_del(&cli->cl_shrink_list);
3325         spin_unlock(&osc_shrink_lock);
3326
3327         /* lru cleanup */
3328         if (cli->cl_cache != NULL) {
3329                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3330                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3331                 list_del_init(&cli->cl_lru_osc);
3332                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3333                 cli->cl_lru_left = NULL;
3334                 cl_cache_decref(cli->cl_cache);
3335                 cli->cl_cache = NULL;
3336         }
3337
3338         /* free memory of osc quota cache */
3339         osc_quota_cleanup(obd);
3340
3341         rc = client_obd_cleanup(obd);
3342
3343         ptlrpcd_decref();
3344         RETURN(rc);
3345 }
3346 EXPORT_SYMBOL(osc_cleanup_common);
3347
3348 static struct obd_ops osc_obd_ops = {
3349         .o_owner                = THIS_MODULE,
3350         .o_setup                = osc_setup,
3351         .o_precleanup           = osc_precleanup,
3352         .o_cleanup              = osc_cleanup_common,
3353         .o_add_conn             = client_import_add_conn,
3354         .o_del_conn             = client_import_del_conn,
3355         .o_connect              = client_connect_import,
3356         .o_reconnect            = osc_reconnect,
3357         .o_disconnect           = osc_disconnect,
3358         .o_statfs               = osc_statfs,
3359         .o_statfs_async         = osc_statfs_async,
3360         .o_create               = osc_create,
3361         .o_destroy              = osc_destroy,
3362         .o_getattr              = osc_getattr,
3363         .o_setattr              = osc_setattr,
3364         .o_iocontrol            = osc_iocontrol,
3365         .o_set_info_async       = osc_set_info_async,
3366         .o_import_event         = osc_import_event,
3367         .o_quotactl             = osc_quotactl,
3368 };
3369
3370 static struct shrinker *osc_cache_shrinker;
3371 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3372 DEFINE_SPINLOCK(osc_shrink_lock);
3373
3374 #ifndef HAVE_SHRINKER_COUNT
3375 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3376 {
3377         struct shrink_control scv = {
3378                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3379                 .gfp_mask   = shrink_param(sc, gfp_mask)
3380         };
3381 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3382         struct shrinker *shrinker = NULL;
3383 #endif
3384
3385         (void)osc_cache_shrink_scan(shrinker, &scv);
3386
3387         return osc_cache_shrink_count(shrinker, &scv);
3388 }
3389 #endif
3390
3391 static int __init osc_init(void)
3392 {
3393         bool enable_proc = true;
3394         struct obd_type *type;
3395         unsigned int reqpool_size;
3396         unsigned int reqsize;
3397         int rc;
3398         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3399                          osc_cache_shrink_count, osc_cache_shrink_scan);
3400         ENTRY;
3401
3402         /* print an address of _any_ initialized kernel symbol from this
3403          * module, to allow debugging with gdb that doesn't support data
3404          * symbols from modules.*/
3405         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3406
3407         rc = lu_kmem_init(osc_caches);
3408         if (rc)
3409                 RETURN(rc);
3410
3411         type = class_search_type(LUSTRE_OSP_NAME);
3412         if (type != NULL && type->typ_procsym != NULL)
3413                 enable_proc = false;
3414
3415         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3416                                  LUSTRE_OSC_NAME, &osc_device_type);
3417         if (rc)
3418                 GOTO(out_kmem, rc);
3419
3420         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3421
3422         /* This is obviously too much memory, only prevent overflow here */
3423         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3424                 GOTO(out_type, rc = -EINVAL);
3425
3426         reqpool_size = osc_reqpool_mem_max << 20;
3427
3428         reqsize = 1;
3429         while (reqsize < OST_IO_MAXREQSIZE)
3430                 reqsize = reqsize << 1;
3431
3432         /*
3433          * We don't enlarge the request count in OSC pool according to
3434          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3435          * tried after normal allocation failed. So a small OSC pool won't
3436          * cause much performance degression in most of cases.
3437          */
3438         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3439
3440         atomic_set(&osc_pool_req_count, 0);
3441         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3442                                           ptlrpc_add_rqs_to_pool);
3443
3444         if (osc_rq_pool == NULL)
3445                 GOTO(out_type, rc = -ENOMEM);
3446
3447         rc = osc_start_grant_work();
3448         if (rc != 0)
3449                 GOTO(out_req_pool, rc);
3450
3451         RETURN(rc);
3452
3453 out_req_pool:
3454         ptlrpc_free_rq_pool(osc_rq_pool);
3455 out_type:
3456         class_unregister_type(LUSTRE_OSC_NAME);
3457 out_kmem:
3458         lu_kmem_fini(osc_caches);
3459
3460         RETURN(rc);
3461 }
3462
3463 static void __exit osc_exit(void)
3464 {
3465         osc_stop_grant_work();
3466         remove_shrinker(osc_cache_shrinker);
3467         class_unregister_type(LUSTRE_OSC_NAME);
3468         lu_kmem_fini(osc_caches);
3469         ptlrpc_free_rq_pool(osc_rq_pool);
3470 }
3471
3472 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3473 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3474 MODULE_VERSION(LUSTRE_VERSION_STRING);
3475 MODULE_LICENSE("GPL");
3476
3477 module_init(osc_init);
3478 module_exit(osc_exit);