Whamcloud - gitweb
LU-12462 llite: Remove old fsync versions
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235                 sa = ptlrpc_req_async_args(req);
236                 sa->sa_oa = oa;
237                 sa->sa_upcall = upcall;
238                 sa->sa_cookie = cookie;
239
240                 if (rqset == PTLRPCD_SET)
241                         ptlrpcd_add_req(req);
242                 else
243                         ptlrpc_set_add_req(rqset, req);
244         }
245
246         RETURN(0);
247 }
248
249 static int osc_ladvise_interpret(const struct lu_env *env,
250                                  struct ptlrpc_request *req,
251                                  void *arg, int rc)
252 {
253         struct osc_ladvise_args *la = arg;
254         struct ost_body *body;
255         ENTRY;
256
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261         if (body == NULL)
262                 GOTO(out, rc = -EPROTO);
263
264         *la->la_oa = body->oa;
265 out:
266         rc = la->la_upcall(la->la_cookie, rc);
267         RETURN(rc);
268 }
269
270 /**
271  * If rqset is NULL, do not wait for response. Upcall and cookie could also
272  * be NULL in this case
273  */
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275                      struct ladvise_hdr *ladvise_hdr,
276                      obd_enqueue_update_f upcall, void *cookie,
277                      struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct ost_body         *body;
281         struct osc_ladvise_args *la;
282         int                      rc;
283         struct lu_ladvise       *req_ladvise;
284         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
285         int                      num_advise = ladvise_hdr->lah_count;
286         struct ladvise_hdr      *req_ladvise_hdr;
287         ENTRY;
288
289         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
290         if (req == NULL)
291                 RETURN(-ENOMEM);
292
293         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294                              num_advise * sizeof(*ladvise));
295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296         if (rc != 0) {
297                 ptlrpc_request_free(req);
298                 RETURN(rc);
299         }
300         req->rq_request_portal = OST_IO_PORTAL;
301         ptlrpc_at_set_req_timeout(req);
302
303         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304         LASSERT(body);
305         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306                              oa);
307
308         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309                                                  &RMF_OST_LADVISE_HDR);
310         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311
312         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314         ptlrpc_request_set_replen(req);
315
316         if (rqset == NULL) {
317                 /* Do not wait for response. */
318                 ptlrpcd_add_req(req);
319                 RETURN(0);
320         }
321
322         req->rq_interpret_reply = osc_ladvise_interpret;
323         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324         la = ptlrpc_req_async_args(req);
325         la->la_oa = oa;
326         la->la_upcall = upcall;
327         la->la_cookie = cookie;
328
329         if (rqset == PTLRPCD_SET)
330                 ptlrpcd_add_req(req);
331         else
332                 ptlrpc_set_add_req(rqset, req);
333
334         RETURN(0);
335 }
336
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
338                       struct obdo *oa)
339 {
340         struct ptlrpc_request *req;
341         struct ost_body       *body;
342         int                    rc;
343         ENTRY;
344
345         LASSERT(oa != NULL);
346         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
348
349         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
350         if (req == NULL)
351                 GOTO(out, rc = -ENOMEM);
352
353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
354         if (rc) {
355                 ptlrpc_request_free(req);
356                 GOTO(out, rc);
357         }
358
359         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
360         LASSERT(body);
361
362         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
363
364         ptlrpc_request_set_replen(req);
365
366         rc = ptlrpc_queue_wait(req);
367         if (rc)
368                 GOTO(out_req, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out_req, rc = -EPROTO);
373
374         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
376
377         oa->o_blksize = cli_brw_size(exp->exp_obd);
378         oa->o_valid |= OBD_MD_FLBLKSZ;
379
380         CDEBUG(D_HA, "transno: %lld\n",
381                lustre_msg_get_transno(req->rq_repmsg));
382 out_req:
383         ptlrpc_req_finished(req);
384 out:
385         RETURN(rc);
386 }
387
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389                    obd_enqueue_update_f upcall, void *cookie)
390 {
391         struct ptlrpc_request *req;
392         struct osc_setattr_args *sa;
393         struct obd_import *imp = class_exp2cliimp(exp);
394         struct ost_body *body;
395         int rc;
396
397         ENTRY;
398
399         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
404         if (rc < 0) {
405                 ptlrpc_request_free(req);
406                 RETURN(rc);
407         }
408
409         osc_set_io_portal(req);
410
411         ptlrpc_at_set_req_timeout(req);
412
413         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414
415         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
416
417         ptlrpc_request_set_replen(req);
418
419         req->rq_interpret_reply = osc_setattr_interpret;
420         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421         sa = ptlrpc_req_async_args(req);
422         sa->sa_oa = oa;
423         sa->sa_upcall = upcall;
424         sa->sa_cookie = cookie;
425
426         ptlrpcd_add_req(req);
427
428         RETURN(0);
429 }
430 EXPORT_SYMBOL(osc_punch_send);
431
432 static int osc_sync_interpret(const struct lu_env *env,
433                               struct ptlrpc_request *req, void *args, int rc)
434 {
435         struct osc_fsync_args *fa = args;
436         struct ost_body *body;
437         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438         unsigned long valid = 0;
439         struct cl_object *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *args, int rc)
551 {
552         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
553
554         atomic_dec(&cli->cl_destroy_in_flight);
555         wake_up(&cli->cl_destroy_waitq);
556
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676                 unsigned long undirty;
677
678                 nrpages = cli->cl_max_pages_per_rpc;
679                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681                 undirty = nrpages << PAGE_SHIFT;
682                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
683                                  GRANT_PARAM)) {
684                         int nrextents;
685
686                         /* take extent tax into account when asking for more
687                          * grant space */
688                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
689                                      cli->cl_max_extent_pages;
690                         undirty += nrextents * cli->cl_grant_extent_tax;
691                 }
692                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693                  * to add extent tax, etc.
694                  */
695                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
697         }
698         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699         oa->o_dropped = cli->cl_lost_grant;
700         cli->cl_lost_grant = 0;
701         spin_unlock(&cli->cl_loi_list_lock);
702         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 }
705
706 void osc_update_next_shrink(struct client_obd *cli)
707 {
708         cli->cl_next_shrink_grant = ktime_get_seconds() +
709                                     cli->cl_grant_shrink_interval;
710
711         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712                cli->cl_next_shrink_grant);
713 }
714
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
716 {
717         spin_lock(&cli->cl_loi_list_lock);
718         cli->cl_avail_grant += grant;
719         spin_unlock(&cli->cl_loi_list_lock);
720 }
721
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 {
724         if (body->oa.o_valid & OBD_MD_FLGRANT) {
725                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726                 __osc_update_grant(cli, body->oa.o_grant);
727         }
728 }
729
730 /**
731  * grant thread data for shrinking space.
732  */
733 struct grant_thread_data {
734         struct list_head        gtd_clients;
735         struct mutex            gtd_mutex;
736         unsigned long           gtd_stopped:1;
737 };
738 static struct grant_thread_data client_gtd;
739
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741                                       struct ptlrpc_request *req,
742                                       void *args, int rc)
743 {
744         struct osc_grant_args *aa = args;
745         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746         struct ost_body *body;
747
748         if (rc != 0) {
749                 __osc_update_grant(cli, aa->aa_oa->o_grant);
750                 GOTO(out, rc);
751         }
752
753         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754         LASSERT(body);
755         osc_update_grant(cli, body);
756 out:
757         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
758
759         return rc;
760 }
761
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
763 {
764         spin_lock(&cli->cl_loi_list_lock);
765         oa->o_grant = cli->cl_avail_grant / 4;
766         cli->cl_avail_grant -= oa->o_grant;
767         spin_unlock(&cli->cl_loi_list_lock);
768         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769                 oa->o_valid |= OBD_MD_FLFLAGS;
770                 oa->o_flags = 0;
771         }
772         oa->o_flags |= OBD_FL_SHRINK_GRANT;
773         osc_update_next_shrink(cli);
774 }
775
776 /* Shrink the current grant, either from some large amount to enough for a
777  * full set of in-flight RPCs, or if we have already shrunk to that limit
778  * then to enough for a single RPC.  This avoids keeping more grant than
779  * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
781 {
782         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
784
785         spin_lock(&cli->cl_loi_list_lock);
786         if (cli->cl_avail_grant <= target_bytes)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788         spin_unlock(&cli->cl_loi_list_lock);
789
790         return osc_shrink_grant_to_target(cli, target_bytes);
791 }
792
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
794 {
795         int                     rc = 0;
796         struct ost_body        *body;
797         ENTRY;
798
799         spin_lock(&cli->cl_loi_list_lock);
800         /* Don't shrink if we are already above or below the desired limit
801          * We don't want to shrink below a single RPC, as that will negatively
802          * impact block allocation and long-term performance. */
803         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
805
806         if (target_bytes >= cli->cl_avail_grant) {
807                 spin_unlock(&cli->cl_loi_list_lock);
808                 RETURN(0);
809         }
810         spin_unlock(&cli->cl_loi_list_lock);
811
812         OBD_ALLOC_PTR(body);
813         if (!body)
814                 RETURN(-ENOMEM);
815
816         osc_announce_cached(cli, &body->oa, 0);
817
818         spin_lock(&cli->cl_loi_list_lock);
819         if (target_bytes >= cli->cl_avail_grant) {
820                 /* available grant has changed since target calculation */
821                 spin_unlock(&cli->cl_loi_list_lock);
822                 GOTO(out_free, rc = 0);
823         }
824         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825         cli->cl_avail_grant = target_bytes;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828                 body->oa.o_valid |= OBD_MD_FLFLAGS;
829                 body->oa.o_flags = 0;
830         }
831         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833
834         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836                                 sizeof(*body), body, NULL);
837         if (rc != 0)
838                 __osc_update_grant(cli, body->oa.o_grant);
839 out_free:
840         OBD_FREE_PTR(body);
841         RETURN(rc);
842 }
843
844 static int osc_should_shrink_grant(struct client_obd *client)
845 {
846         time64_t next_shrink = client->cl_next_shrink_grant;
847
848         if (client->cl_import == NULL)
849                 return 0;
850
851         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852              OBD_CONNECT_GRANT_SHRINK) == 0)
853                 return 0;
854
855         if (ktime_get_seconds() >= next_shrink - 5) {
856                 /* Get the current RPC size directly, instead of going via:
857                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858                  * Keep comment here so that it can be found by searching. */
859                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860
861                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862                     client->cl_avail_grant > brw_size)
863                         return 1;
864                 else
865                         osc_update_next_shrink(client);
866         }
867         return 0;
868 }
869
870 #define GRANT_SHRINK_RPC_BATCH  100
871
872 static struct delayed_work work;
873
874 static void osc_grant_work_handler(struct work_struct *data)
875 {
876         struct client_obd *cli;
877         int rpc_sent;
878         bool init_next_shrink = true;
879         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
880
881         rpc_sent = 0;
882         mutex_lock(&client_gtd.gtd_mutex);
883         list_for_each_entry(cli, &client_gtd.gtd_clients,
884                             cl_grant_chain) {
885                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886                     osc_should_shrink_grant(cli)) {
887                         osc_shrink_grant(cli);
888                         rpc_sent++;
889                 }
890
891                 if (!init_next_shrink) {
892                         if (cli->cl_next_shrink_grant < next_shrink &&
893                             cli->cl_next_shrink_grant > ktime_get_seconds())
894                                 next_shrink = cli->cl_next_shrink_grant;
895                 } else {
896                         init_next_shrink = false;
897                         next_shrink = cli->cl_next_shrink_grant;
898                 }
899         }
900         mutex_unlock(&client_gtd.gtd_mutex);
901
902         if (client_gtd.gtd_stopped == 1)
903                 return;
904
905         if (next_shrink > ktime_get_seconds())
906                 schedule_delayed_work(&work, msecs_to_jiffies(
907                                         (next_shrink - ktime_get_seconds()) *
908                                         MSEC_PER_SEC));
909         else
910                 schedule_work(&work.work);
911 }
912
913 void osc_schedule_grant_work(void)
914 {
915         cancel_delayed_work_sync(&work);
916         schedule_work(&work.work);
917 }
918
919 /**
920  * Start grant thread for returing grant to server for idle clients.
921  */
922 static int osc_start_grant_work(void)
923 {
924         client_gtd.gtd_stopped = 0;
925         mutex_init(&client_gtd.gtd_mutex);
926         INIT_LIST_HEAD(&client_gtd.gtd_clients);
927
928         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
929         schedule_work(&work.work);
930
931         return 0;
932 }
933
934 static void osc_stop_grant_work(void)
935 {
936         client_gtd.gtd_stopped = 1;
937         cancel_delayed_work_sync(&work);
938 }
939
940 static void osc_add_grant_list(struct client_obd *client)
941 {
942         mutex_lock(&client_gtd.gtd_mutex);
943         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
944         mutex_unlock(&client_gtd.gtd_mutex);
945 }
946
947 static void osc_del_grant_list(struct client_obd *client)
948 {
949         if (list_empty(&client->cl_grant_chain))
950                 return;
951
952         mutex_lock(&client_gtd.gtd_mutex);
953         list_del_init(&client->cl_grant_chain);
954         mutex_unlock(&client_gtd.gtd_mutex);
955 }
956
957 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
958 {
959         /*
960          * ocd_grant is the total grant amount we're expect to hold: if we've
961          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
962          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
963          * dirty.
964          *
965          * race is tolerable here: if we're evicted, but imp_state already
966          * left EVICTED state, then cl_dirty_pages must be 0 already.
967          */
968         spin_lock(&cli->cl_loi_list_lock);
969         cli->cl_avail_grant = ocd->ocd_grant;
970         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
971                 cli->cl_avail_grant -= cli->cl_reserved_grant;
972                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
973                         cli->cl_avail_grant -= cli->cl_dirty_grant;
974                 else
975                         cli->cl_avail_grant -=
976                                         cli->cl_dirty_pages << PAGE_SHIFT;
977         }
978
979         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
980                 u64 size;
981                 int chunk_mask;
982
983                 /* overhead for each extent insertion */
984                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
985                 /* determine the appropriate chunk size used by osc_extent. */
986                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
987                                           ocd->ocd_grant_blkbits);
988                 /* max_pages_per_rpc must be chunk aligned */
989                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
990                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
991                                              ~chunk_mask) & chunk_mask;
992                 /* determine maximum extent size, in #pages */
993                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
994                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
995                 if (cli->cl_max_extent_pages == 0)
996                         cli->cl_max_extent_pages = 1;
997         } else {
998                 cli->cl_grant_extent_tax = 0;
999                 cli->cl_chunkbits = PAGE_SHIFT;
1000                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1001         }
1002         spin_unlock(&cli->cl_loi_list_lock);
1003
1004         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1005                 "chunk bits: %d cl_max_extent_pages: %d\n",
1006                 cli_name(cli),
1007                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1008                 cli->cl_max_extent_pages);
1009
1010         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1011                 osc_add_grant_list(cli);
1012 }
1013 EXPORT_SYMBOL(osc_init_grant);
1014
1015 /* We assume that the reason this OSC got a short read is because it read
1016  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1017  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1018  * this stripe never got written at or beyond this stripe offset yet. */
1019 static void handle_short_read(int nob_read, size_t page_count,
1020                               struct brw_page **pga)
1021 {
1022         char *ptr;
1023         int i = 0;
1024
1025         /* skip bytes read OK */
1026         while (nob_read > 0) {
1027                 LASSERT (page_count > 0);
1028
1029                 if (pga[i]->count > nob_read) {
1030                         /* EOF inside this page */
1031                         ptr = kmap(pga[i]->pg) +
1032                                 (pga[i]->off & ~PAGE_MASK);
1033                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1034                         kunmap(pga[i]->pg);
1035                         page_count--;
1036                         i++;
1037                         break;
1038                 }
1039
1040                 nob_read -= pga[i]->count;
1041                 page_count--;
1042                 i++;
1043         }
1044
1045         /* zero remaining pages */
1046         while (page_count-- > 0) {
1047                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1048                 memset(ptr, 0, pga[i]->count);
1049                 kunmap(pga[i]->pg);
1050                 i++;
1051         }
1052 }
1053
1054 static int check_write_rcs(struct ptlrpc_request *req,
1055                            int requested_nob, int niocount,
1056                            size_t page_count, struct brw_page **pga)
1057 {
1058         int     i;
1059         __u32   *remote_rcs;
1060
1061         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1062                                                   sizeof(*remote_rcs) *
1063                                                   niocount);
1064         if (remote_rcs == NULL) {
1065                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1066                 return(-EPROTO);
1067         }
1068
1069         /* return error if any niobuf was in error */
1070         for (i = 0; i < niocount; i++) {
1071                 if ((int)remote_rcs[i] < 0) {
1072                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1073                                i, remote_rcs[i], req);
1074                         return remote_rcs[i];
1075                 }
1076
1077                 if (remote_rcs[i] != 0) {
1078                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1079                                 i, remote_rcs[i], req);
1080                         return(-EPROTO);
1081                 }
1082         }
1083         if (req->rq_bulk != NULL &&
1084             req->rq_bulk->bd_nob_transferred != requested_nob) {
1085                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1086                        req->rq_bulk->bd_nob_transferred, requested_nob);
1087                 return(-EPROTO);
1088         }
1089
1090         return (0);
1091 }
1092
1093 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1094 {
1095         if (p1->flag != p2->flag) {
1096                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1097                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1098                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1099
1100                 /* warn if we try to combine flags that we don't know to be
1101                  * safe to combine */
1102                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1103                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1104                               "report this at https://jira.whamcloud.com/\n",
1105                               p1->flag, p2->flag);
1106                 }
1107                 return 0;
1108         }
1109
1110         return (p1->off + p1->count == p2->off);
1111 }
1112
1113 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1114 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1115                                    size_t pg_count, struct brw_page **pga,
1116                                    int opc, obd_dif_csum_fn *fn,
1117                                    int sector_size,
1118                                    u32 *check_sum)
1119 {
1120         struct ahash_request *req;
1121         /* Used Adler as the default checksum type on top of DIF tags */
1122         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1123         struct page *__page;
1124         unsigned char *buffer;
1125         __u16 *guard_start;
1126         unsigned int bufsize;
1127         int guard_number;
1128         int used_number = 0;
1129         int used;
1130         u32 cksum;
1131         int rc = 0;
1132         int i = 0;
1133
1134         LASSERT(pg_count > 0);
1135
1136         __page = alloc_page(GFP_KERNEL);
1137         if (__page == NULL)
1138                 return -ENOMEM;
1139
1140         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1141         if (IS_ERR(req)) {
1142                 rc = PTR_ERR(req);
1143                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1144                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1145                 GOTO(out, rc);
1146         }
1147
1148         buffer = kmap(__page);
1149         guard_start = (__u16 *)buffer;
1150         guard_number = PAGE_SIZE / sizeof(*guard_start);
1151         while (nob > 0 && pg_count > 0) {
1152                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1153
1154                 /* corrupt the data before we compute the checksum, to
1155                  * simulate an OST->client data error */
1156                 if (unlikely(i == 0 && opc == OST_READ &&
1157                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1158                         unsigned char *ptr = kmap(pga[i]->pg);
1159                         int off = pga[i]->off & ~PAGE_MASK;
1160
1161                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1162                         kunmap(pga[i]->pg);
1163                 }
1164
1165                 /*
1166                  * The left guard number should be able to hold checksums of a
1167                  * whole page
1168                  */
1169                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1170                                                   pga[i]->off & ~PAGE_MASK,
1171                                                   count,
1172                                                   guard_start + used_number,
1173                                                   guard_number - used_number,
1174                                                   &used, sector_size,
1175                                                   fn);
1176                 if (rc)
1177                         break;
1178
1179                 used_number += used;
1180                 if (used_number == guard_number) {
1181                         cfs_crypto_hash_update_page(req, __page, 0,
1182                                 used_number * sizeof(*guard_start));
1183                         used_number = 0;
1184                 }
1185
1186                 nob -= pga[i]->count;
1187                 pg_count--;
1188                 i++;
1189         }
1190         kunmap(__page);
1191         if (rc)
1192                 GOTO(out, rc);
1193
1194         if (used_number != 0)
1195                 cfs_crypto_hash_update_page(req, __page, 0,
1196                         used_number * sizeof(*guard_start));
1197
1198         bufsize = sizeof(cksum);
1199         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1200
1201         /* For sending we only compute the wrong checksum instead
1202          * of corrupting the data so it is still correct on a redo */
1203         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1204                 cksum++;
1205
1206         *check_sum = cksum;
1207 out:
1208         __free_page(__page);
1209         return rc;
1210 }
1211 #else /* !CONFIG_CRC_T10DIF */
1212 #define obd_dif_ip_fn NULL
1213 #define obd_dif_crc_fn NULL
1214 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1215         -EOPNOTSUPP
1216 #endif /* CONFIG_CRC_T10DIF */
1217
1218 static int osc_checksum_bulk(int nob, size_t pg_count,
1219                              struct brw_page **pga, int opc,
1220                              enum cksum_types cksum_type,
1221                              u32 *cksum)
1222 {
1223         int                             i = 0;
1224         struct ahash_request           *req;
1225         unsigned int                    bufsize;
1226         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1227
1228         LASSERT(pg_count > 0);
1229
1230         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1231         if (IS_ERR(req)) {
1232                 CERROR("Unable to initialize checksum hash %s\n",
1233                        cfs_crypto_hash_name(cfs_alg));
1234                 return PTR_ERR(req);
1235         }
1236
1237         while (nob > 0 && pg_count > 0) {
1238                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1239
1240                 /* corrupt the data before we compute the checksum, to
1241                  * simulate an OST->client data error */
1242                 if (i == 0 && opc == OST_READ &&
1243                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1244                         unsigned char *ptr = kmap(pga[i]->pg);
1245                         int off = pga[i]->off & ~PAGE_MASK;
1246
1247                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1248                         kunmap(pga[i]->pg);
1249                 }
1250                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1251                                             pga[i]->off & ~PAGE_MASK,
1252                                             count);
1253                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1254                                (int)(pga[i]->off & ~PAGE_MASK));
1255
1256                 nob -= pga[i]->count;
1257                 pg_count--;
1258                 i++;
1259         }
1260
1261         bufsize = sizeof(*cksum);
1262         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1263
1264         /* For sending we only compute the wrong checksum instead
1265          * of corrupting the data so it is still correct on a redo */
1266         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267                 (*cksum)++;
1268
1269         return 0;
1270 }
1271
1272 static int osc_checksum_bulk_rw(const char *obd_name,
1273                                 enum cksum_types cksum_type,
1274                                 int nob, size_t pg_count,
1275                                 struct brw_page **pga, int opc,
1276                                 u32 *check_sum)
1277 {
1278         obd_dif_csum_fn *fn = NULL;
1279         int sector_size = 0;
1280         int rc;
1281
1282         ENTRY;
1283         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1284
1285         if (fn)
1286                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1287                                              opc, fn, sector_size, check_sum);
1288         else
1289                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1290                                        check_sum);
1291
1292         RETURN(rc);
1293 }
1294
1295 static int
1296 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1297                      u32 page_count, struct brw_page **pga,
1298                      struct ptlrpc_request **reqp, int resend)
1299 {
1300         struct ptlrpc_request   *req;
1301         struct ptlrpc_bulk_desc *desc;
1302         struct ost_body         *body;
1303         struct obd_ioobj        *ioobj;
1304         struct niobuf_remote    *niobuf;
1305         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1306         struct osc_brw_async_args *aa;
1307         struct req_capsule      *pill;
1308         struct brw_page *pg_prev;
1309         void *short_io_buf;
1310         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1311
1312         ENTRY;
1313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1314                 RETURN(-ENOMEM); /* Recoverable */
1315         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1316                 RETURN(-EINVAL); /* Fatal */
1317
1318         if ((cmd & OBD_BRW_WRITE) != 0) {
1319                 opc = OST_WRITE;
1320                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1321                                                 osc_rq_pool,
1322                                                 &RQF_OST_BRW_WRITE);
1323         } else {
1324                 opc = OST_READ;
1325                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1326         }
1327         if (req == NULL)
1328                 RETURN(-ENOMEM);
1329
1330         for (niocount = i = 1; i < page_count; i++) {
1331                 if (!can_merge_pages(pga[i - 1], pga[i]))
1332                         niocount++;
1333         }
1334
1335         pill = &req->rq_pill;
1336         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1337                              sizeof(*ioobj));
1338         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1339                              niocount * sizeof(*niobuf));
1340
1341         for (i = 0; i < page_count; i++)
1342                 short_io_size += pga[i]->count;
1343
1344         /* Check if read/write is small enough to be a short io. */
1345         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1346             !imp_connect_shortio(cli->cl_import))
1347                 short_io_size = 0;
1348
1349         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1350                              opc == OST_READ ? 0 : short_io_size);
1351         if (opc == OST_READ)
1352                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1353                                      short_io_size);
1354
1355         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1356         if (rc) {
1357                 ptlrpc_request_free(req);
1358                 RETURN(rc);
1359         }
1360         osc_set_io_portal(req);
1361
1362         ptlrpc_at_set_req_timeout(req);
1363         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1364          * retry logic */
1365         req->rq_no_retry_einprogress = 1;
1366
1367         if (short_io_size != 0) {
1368                 desc = NULL;
1369                 short_io_buf = NULL;
1370                 goto no_bulk;
1371         }
1372
1373         desc = ptlrpc_prep_bulk_imp(req, page_count,
1374                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1375                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1376                         PTLRPC_BULK_PUT_SINK) |
1377                         PTLRPC_BULK_BUF_KIOV,
1378                 OST_BULK_PORTAL,
1379                 &ptlrpc_bulk_kiov_pin_ops);
1380
1381         if (desc == NULL)
1382                 GOTO(out, rc = -ENOMEM);
1383         /* NB request now owns desc and will free it when it gets freed */
1384 no_bulk:
1385         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1386         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1387         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1388         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1389
1390         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1391
1392         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1393          * and from_kgid(), because they are asynchronous. Fortunately, variable
1394          * oa contains valid o_uid and o_gid in these two operations.
1395          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1396          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1397          * other process logic */
1398         body->oa.o_uid = oa->o_uid;
1399         body->oa.o_gid = oa->o_gid;
1400
1401         obdo_to_ioobj(oa, ioobj);
1402         ioobj->ioo_bufcnt = niocount;
1403         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1404          * that might be send for this request.  The actual number is decided
1405          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1406          * "max - 1" for old client compatibility sending "0", and also so the
1407          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1408         if (desc != NULL)
1409                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1410         else /* short io */
1411                 ioobj_max_brw_set(ioobj, 0);
1412
1413         if (short_io_size != 0) {
1414                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1415                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1416                         body->oa.o_flags = 0;
1417                 }
1418                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1419                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1420                        short_io_size);
1421                 if (opc == OST_WRITE) {
1422                         short_io_buf = req_capsule_client_get(pill,
1423                                                               &RMF_SHORT_IO);
1424                         LASSERT(short_io_buf != NULL);
1425                 }
1426         }
1427
1428         LASSERT(page_count > 0);
1429         pg_prev = pga[0];
1430         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1431                 struct brw_page *pg = pga[i];
1432                 int poff = pg->off & ~PAGE_MASK;
1433
1434                 LASSERT(pg->count > 0);
1435                 /* make sure there is no gap in the middle of page array */
1436                 LASSERTF(page_count == 1 ||
1437                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1438                           ergo(i > 0 && i < page_count - 1,
1439                                poff == 0 && pg->count == PAGE_SIZE)   &&
1440                           ergo(i == page_count - 1, poff == 0)),
1441                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1442                          i, page_count, pg, pg->off, pg->count);
1443                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1444                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1445                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1446                          i, page_count,
1447                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1448                          pg_prev->pg, page_private(pg_prev->pg),
1449                          pg_prev->pg->index, pg_prev->off);
1450                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1451                         (pg->flag & OBD_BRW_SRVLOCK));
1452                 if (short_io_size != 0 && opc == OST_WRITE) {
1453                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1454
1455                         LASSERT(short_io_size >= requested_nob + pg->count);
1456                         memcpy(short_io_buf + requested_nob,
1457                                ptr + poff,
1458                                pg->count);
1459                         ll_kunmap_atomic(ptr, KM_USER0);
1460                 } else if (short_io_size == 0) {
1461                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1462                                                          pg->count);
1463                 }
1464                 requested_nob += pg->count;
1465
1466                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1467                         niobuf--;
1468                         niobuf->rnb_len += pg->count;
1469                 } else {
1470                         niobuf->rnb_offset = pg->off;
1471                         niobuf->rnb_len    = pg->count;
1472                         niobuf->rnb_flags  = pg->flag;
1473                 }
1474                 pg_prev = pg;
1475         }
1476
1477         LASSERTF((void *)(niobuf - niocount) ==
1478                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1479                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1480                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1481
1482         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1483         if (resend) {
1484                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1485                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1486                         body->oa.o_flags = 0;
1487                 }
1488                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1489         }
1490
1491         if (osc_should_shrink_grant(cli))
1492                 osc_shrink_grant_local(cli, &body->oa);
1493
1494         /* size[REQ_REC_OFF] still sizeof (*body) */
1495         if (opc == OST_WRITE) {
1496                 if (cli->cl_checksum &&
1497                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1498                         /* store cl_cksum_type in a local variable since
1499                          * it can be changed via lprocfs */
1500                         enum cksum_types cksum_type = cli->cl_cksum_type;
1501
1502                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1503                                 body->oa.o_flags = 0;
1504
1505                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1506                                                                 cksum_type);
1507                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1508
1509                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1510                                                   requested_nob, page_count,
1511                                                   pga, OST_WRITE,
1512                                                   &body->oa.o_cksum);
1513                         if (rc < 0) {
1514                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1515                                        rc);
1516                                 GOTO(out, rc);
1517                         }
1518                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1519                                body->oa.o_cksum);
1520
1521                         /* save this in 'oa', too, for later checking */
1522                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1523                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1524                                                            cksum_type);
1525                 } else {
1526                         /* clear out the checksum flag, in case this is a
1527                          * resend but cl_checksum is no longer set. b=11238 */
1528                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1529                 }
1530                 oa->o_cksum = body->oa.o_cksum;
1531                 /* 1 RC per niobuf */
1532                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1533                                      sizeof(__u32) * niocount);
1534         } else {
1535                 if (cli->cl_checksum &&
1536                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1537                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1538                                 body->oa.o_flags = 0;
1539                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1540                                 cli->cl_cksum_type);
1541                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1542                 }
1543
1544                 /* Client cksum has been already copied to wire obdo in previous
1545                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1546                  * resent due to cksum error, this will allow Server to
1547                  * check+dump pages on its side */
1548         }
1549         ptlrpc_request_set_replen(req);
1550
1551         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1552         aa = ptlrpc_req_async_args(req);
1553         aa->aa_oa = oa;
1554         aa->aa_requested_nob = requested_nob;
1555         aa->aa_nio_count = niocount;
1556         aa->aa_page_count = page_count;
1557         aa->aa_resends = 0;
1558         aa->aa_ppga = pga;
1559         aa->aa_cli = cli;
1560         INIT_LIST_HEAD(&aa->aa_oaps);
1561
1562         *reqp = req;
1563         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1564         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1565                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1566                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1567         RETURN(0);
1568
1569  out:
1570         ptlrpc_req_finished(req);
1571         RETURN(rc);
1572 }
1573
1574 char dbgcksum_file_name[PATH_MAX];
1575
1576 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1577                                 struct brw_page **pga, __u32 server_cksum,
1578                                 __u32 client_cksum)
1579 {
1580         struct file *filp;
1581         int rc, i;
1582         unsigned int len;
1583         char *buf;
1584
1585         /* will only keep dump of pages on first error for the same range in
1586          * file/fid, not during the resends/retries. */
1587         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1588                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1589                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1590                   libcfs_debug_file_path_arr :
1591                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1592                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1593                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1594                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1595                  pga[0]->off,
1596                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1597                  client_cksum, server_cksum);
1598         filp = filp_open(dbgcksum_file_name,
1599                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1600         if (IS_ERR(filp)) {
1601                 rc = PTR_ERR(filp);
1602                 if (rc == -EEXIST)
1603                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1604                                "checksum error: rc = %d\n", dbgcksum_file_name,
1605                                rc);
1606                 else
1607                         CERROR("%s: can't open to dump pages with checksum "
1608                                "error: rc = %d\n", dbgcksum_file_name, rc);
1609                 return;
1610         }
1611
1612         for (i = 0; i < page_count; i++) {
1613                 len = pga[i]->count;
1614                 buf = kmap(pga[i]->pg);
1615                 while (len != 0) {
1616                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1617                         if (rc < 0) {
1618                                 CERROR("%s: wanted to write %u but got %d "
1619                                        "error\n", dbgcksum_file_name, len, rc);
1620                                 break;
1621                         }
1622                         len -= rc;
1623                         buf += rc;
1624                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1625                                dbgcksum_file_name, rc);
1626                 }
1627                 kunmap(pga[i]->pg);
1628         }
1629
1630         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1631         if (rc)
1632                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1633         filp_close(filp, NULL);
1634         return;
1635 }
1636
1637 static int
1638 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1639                      __u32 client_cksum, __u32 server_cksum,
1640                      struct osc_brw_async_args *aa)
1641 {
1642         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1643         enum cksum_types cksum_type;
1644         obd_dif_csum_fn *fn = NULL;
1645         int sector_size = 0;
1646         __u32 new_cksum;
1647         char *msg;
1648         int rc;
1649
1650         if (server_cksum == client_cksum) {
1651                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1652                 return 0;
1653         }
1654
1655         if (aa->aa_cli->cl_checksum_dump)
1656                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1657                                     server_cksum, client_cksum);
1658
1659         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1660                                            oa->o_flags : 0);
1661
1662         switch (cksum_type) {
1663         case OBD_CKSUM_T10IP512:
1664                 fn = obd_dif_ip_fn;
1665                 sector_size = 512;
1666                 break;
1667         case OBD_CKSUM_T10IP4K:
1668                 fn = obd_dif_ip_fn;
1669                 sector_size = 4096;
1670                 break;
1671         case OBD_CKSUM_T10CRC512:
1672                 fn = obd_dif_crc_fn;
1673                 sector_size = 512;
1674                 break;
1675         case OBD_CKSUM_T10CRC4K:
1676                 fn = obd_dif_crc_fn;
1677                 sector_size = 4096;
1678                 break;
1679         default:
1680                 break;
1681         }
1682
1683         if (fn)
1684                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1685                                              aa->aa_page_count, aa->aa_ppga,
1686                                              OST_WRITE, fn, sector_size,
1687                                              &new_cksum);
1688         else
1689                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1690                                        aa->aa_ppga, OST_WRITE, cksum_type,
1691                                        &new_cksum);
1692
1693         if (rc < 0)
1694                 msg = "failed to calculate the client write checksum";
1695         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1696                 msg = "the server did not use the checksum type specified in "
1697                       "the original request - likely a protocol problem";
1698         else if (new_cksum == server_cksum)
1699                 msg = "changed on the client after we checksummed it - "
1700                       "likely false positive due to mmap IO (bug 11742)";
1701         else if (new_cksum == client_cksum)
1702                 msg = "changed in transit before arrival at OST";
1703         else
1704                 msg = "changed in transit AND doesn't match the original - "
1705                       "likely false positive due to mmap IO (bug 11742)";
1706
1707         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1708                            DFID " object "DOSTID" extent [%llu-%llu], original "
1709                            "client csum %x (type %x), server csum %x (type %x),"
1710                            " client csum now %x\n",
1711                            obd_name, msg, libcfs_nid2str(peer->nid),
1712                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1713                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1714                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1715                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1716                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1717                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1718                            client_cksum,
1719                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1720                            server_cksum, cksum_type, new_cksum);
1721         return 1;
1722 }
1723
1724 /* Note rc enters this function as number of bytes transferred */
1725 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1726 {
1727         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1728         struct client_obd *cli = aa->aa_cli;
1729         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1730         const struct lnet_process_id *peer =
1731                 &req->rq_import->imp_connection->c_peer;
1732         struct ost_body *body;
1733         u32 client_cksum = 0;
1734         ENTRY;
1735
1736         if (rc < 0 && rc != -EDQUOT) {
1737                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1738                 RETURN(rc);
1739         }
1740
1741         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1742         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1743         if (body == NULL) {
1744                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1745                 RETURN(-EPROTO);
1746         }
1747
1748         /* set/clear over quota flag for a uid/gid/projid */
1749         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1750             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1751                 unsigned qid[LL_MAXQUOTAS] = {
1752                                          body->oa.o_uid, body->oa.o_gid,
1753                                          body->oa.o_projid };
1754                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1755                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1756                        body->oa.o_valid, body->oa.o_flags);
1757                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1758                                        body->oa.o_flags);
1759         }
1760
1761         osc_update_grant(cli, body);
1762
1763         if (rc < 0)
1764                 RETURN(rc);
1765
1766         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1767                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1768
1769         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1770                 if (rc > 0) {
1771                         CERROR("Unexpected +ve rc %d\n", rc);
1772                         RETURN(-EPROTO);
1773                 }
1774
1775                 if (req->rq_bulk != NULL &&
1776                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1777                         RETURN(-EAGAIN);
1778
1779                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780                     check_write_checksum(&body->oa, peer, client_cksum,
1781                                          body->oa.o_cksum, aa))
1782                         RETURN(-EAGAIN);
1783
1784                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1785                                      aa->aa_page_count, aa->aa_ppga);
1786                 GOTO(out, rc);
1787         }
1788
1789         /* The rest of this function executes only for OST_READs */
1790
1791         if (req->rq_bulk == NULL) {
1792                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1793                                           RCL_SERVER);
1794                 LASSERT(rc == req->rq_status);
1795         } else {
1796                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1797                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1798         }
1799         if (rc < 0)
1800                 GOTO(out, rc = -EAGAIN);
1801
1802         if (rc > aa->aa_requested_nob) {
1803                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1804                        aa->aa_requested_nob);
1805                 RETURN(-EPROTO);
1806         }
1807
1808         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1809                 CERROR ("Unexpected rc %d (%d transferred)\n",
1810                         rc, req->rq_bulk->bd_nob_transferred);
1811                 return (-EPROTO);
1812         }
1813
1814         if (req->rq_bulk == NULL) {
1815                 /* short io */
1816                 int nob, pg_count, i = 0;
1817                 unsigned char *buf;
1818
1819                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1820                 pg_count = aa->aa_page_count;
1821                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1822                                                    rc);
1823                 nob = rc;
1824                 while (nob > 0 && pg_count > 0) {
1825                         unsigned char *ptr;
1826                         int count = aa->aa_ppga[i]->count > nob ?
1827                                     nob : aa->aa_ppga[i]->count;
1828
1829                         CDEBUG(D_CACHE, "page %p count %d\n",
1830                                aa->aa_ppga[i]->pg, count);
1831                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1832                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1833                                count);
1834                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1835
1836                         buf += count;
1837                         nob -= count;
1838                         i++;
1839                         pg_count--;
1840                 }
1841         }
1842
1843         if (rc < aa->aa_requested_nob)
1844                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1845
1846         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1847                 static int cksum_counter;
1848                 u32        server_cksum = body->oa.o_cksum;
1849                 char      *via = "";
1850                 char      *router = "";
1851                 enum cksum_types cksum_type;
1852                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1853                         body->oa.o_flags : 0;
1854
1855                 cksum_type = obd_cksum_type_unpack(o_flags);
1856                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1857                                           aa->aa_page_count, aa->aa_ppga,
1858                                           OST_READ, &client_cksum);
1859                 if (rc < 0)
1860                         GOTO(out, rc);
1861
1862                 if (req->rq_bulk != NULL &&
1863                     peer->nid != req->rq_bulk->bd_sender) {
1864                         via = " via ";
1865                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1866                 }
1867
1868                 if (server_cksum != client_cksum) {
1869                         struct ost_body *clbody;
1870                         u32 page_count = aa->aa_page_count;
1871
1872                         clbody = req_capsule_client_get(&req->rq_pill,
1873                                                         &RMF_OST_BODY);
1874                         if (cli->cl_checksum_dump)
1875                                 dump_all_bulk_pages(&clbody->oa, page_count,
1876                                                     aa->aa_ppga, server_cksum,
1877                                                     client_cksum);
1878
1879                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1880                                            "%s%s%s inode "DFID" object "DOSTID
1881                                            " extent [%llu-%llu], client %x, "
1882                                            "server %x, cksum_type %x\n",
1883                                            obd_name,
1884                                            libcfs_nid2str(peer->nid),
1885                                            via, router,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_seq : 0ULL,
1888                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1889                                                 clbody->oa.o_parent_oid : 0,
1890                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1891                                                 clbody->oa.o_parent_ver : 0,
1892                                            POSTID(&body->oa.o_oi),
1893                                            aa->aa_ppga[0]->off,
1894                                            aa->aa_ppga[page_count-1]->off +
1895                                            aa->aa_ppga[page_count-1]->count - 1,
1896                                            client_cksum, server_cksum,
1897                                            cksum_type);
1898                         cksum_counter = 0;
1899                         aa->aa_oa->o_cksum = client_cksum;
1900                         rc = -EAGAIN;
1901                 } else {
1902                         cksum_counter++;
1903                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1904                         rc = 0;
1905                 }
1906         } else if (unlikely(client_cksum)) {
1907                 static int cksum_missed;
1908
1909                 cksum_missed++;
1910                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1911                         CERROR("Checksum %u requested from %s but not sent\n",
1912                                cksum_missed, libcfs_nid2str(peer->nid));
1913         } else {
1914                 rc = 0;
1915         }
1916 out:
1917         if (rc >= 0)
1918                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1919                                      aa->aa_oa, &body->oa);
1920
1921         RETURN(rc);
1922 }
1923
1924 static int osc_brw_redo_request(struct ptlrpc_request *request,
1925                                 struct osc_brw_async_args *aa, int rc)
1926 {
1927         struct ptlrpc_request *new_req;
1928         struct osc_brw_async_args *new_aa;
1929         struct osc_async_page *oap;
1930         ENTRY;
1931
1932         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1933                   "redo for recoverable error %d", rc);
1934
1935         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1936                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1937                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1938                                   aa->aa_ppga, &new_req, 1);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1943                 if (oap->oap_request != NULL) {
1944                         LASSERTF(request == oap->oap_request,
1945                                  "request %p != oap_request %p\n",
1946                                  request, oap->oap_request);
1947                         if (oap->oap_interrupted) {
1948                                 ptlrpc_req_finished(new_req);
1949                                 RETURN(-EINTR);
1950                         }
1951                 }
1952         }
1953         /*
1954          * New request takes over pga and oaps from old request.
1955          * Note that copying a list_head doesn't work, need to move it...
1956          */
1957         aa->aa_resends++;
1958         new_req->rq_interpret_reply = request->rq_interpret_reply;
1959         new_req->rq_async_args = request->rq_async_args;
1960         new_req->rq_commit_cb = request->rq_commit_cb;
1961         /* cap resend delay to the current request timeout, this is similar to
1962          * what ptlrpc does (see after_reply()) */
1963         if (aa->aa_resends > new_req->rq_timeout)
1964                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1965         else
1966                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1967         new_req->rq_generation_set = 1;
1968         new_req->rq_import_generation = request->rq_import_generation;
1969
1970         new_aa = ptlrpc_req_async_args(new_req);
1971
1972         INIT_LIST_HEAD(&new_aa->aa_oaps);
1973         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1974         INIT_LIST_HEAD(&new_aa->aa_exts);
1975         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1976         new_aa->aa_resends = aa->aa_resends;
1977
1978         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1979                 if (oap->oap_request) {
1980                         ptlrpc_req_finished(oap->oap_request);
1981                         oap->oap_request = ptlrpc_request_addref(new_req);
1982                 }
1983         }
1984
1985         /* XXX: This code will run into problem if we're going to support
1986          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1987          * and wait for all of them to be finished. We should inherit request
1988          * set from old request. */
1989         ptlrpcd_add_req(new_req);
1990
1991         DEBUG_REQ(D_INFO, new_req, "new request");
1992         RETURN(0);
1993 }
1994
1995 /*
1996  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1997  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1998  * fine for our small page arrays and doesn't require allocation.  its an
1999  * insertion sort that swaps elements that are strides apart, shrinking the
2000  * stride down until its '1' and the array is sorted.
2001  */
2002 static void sort_brw_pages(struct brw_page **array, int num)
2003 {
2004         int stride, i, j;
2005         struct brw_page *tmp;
2006
2007         if (num == 1)
2008                 return;
2009         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2010                 ;
2011
2012         do {
2013                 stride /= 3;
2014                 for (i = stride ; i < num ; i++) {
2015                         tmp = array[i];
2016                         j = i;
2017                         while (j >= stride && array[j - stride]->off > tmp->off) {
2018                                 array[j] = array[j - stride];
2019                                 j -= stride;
2020                         }
2021                         array[j] = tmp;
2022                 }
2023         } while (stride > 1);
2024 }
2025
2026 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2027 {
2028         LASSERT(ppga != NULL);
2029         OBD_FREE(ppga, sizeof(*ppga) * count);
2030 }
2031
2032 static int brw_interpret(const struct lu_env *env,
2033                          struct ptlrpc_request *req, void *args, int rc)
2034 {
2035         struct osc_brw_async_args *aa = args;
2036         struct osc_extent *ext;
2037         struct osc_extent *tmp;
2038         struct client_obd *cli = aa->aa_cli;
2039         unsigned long transferred = 0;
2040
2041         ENTRY;
2042
2043         rc = osc_brw_fini_request(req, rc);
2044         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2045         /*
2046          * When server returns -EINPROGRESS, client should always retry
2047          * regardless of the number of times the bulk was resent already.
2048          */
2049         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2050                 if (req->rq_import_generation !=
2051                     req->rq_import->imp_generation) {
2052                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2053                                ""DOSTID", rc = %d.\n",
2054                                req->rq_import->imp_obd->obd_name,
2055                                POSTID(&aa->aa_oa->o_oi), rc);
2056                 } else if (rc == -EINPROGRESS ||
2057                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2058                         rc = osc_brw_redo_request(req, aa, rc);
2059                 } else {
2060                         CERROR("%s: too many resent retries for object: "
2061                                "%llu:%llu, rc = %d.\n",
2062                                req->rq_import->imp_obd->obd_name,
2063                                POSTID(&aa->aa_oa->o_oi), rc);
2064                 }
2065
2066                 if (rc == 0)
2067                         RETURN(0);
2068                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2069                         rc = -EIO;
2070         }
2071
2072         if (rc == 0) {
2073                 struct obdo *oa = aa->aa_oa;
2074                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2075                 unsigned long valid = 0;
2076                 struct cl_object *obj;
2077                 struct osc_async_page *last;
2078
2079                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2080                 obj = osc2cl(last->oap_obj);
2081
2082                 cl_object_attr_lock(obj);
2083                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2084                         attr->cat_blocks = oa->o_blocks;
2085                         valid |= CAT_BLOCKS;
2086                 }
2087                 if (oa->o_valid & OBD_MD_FLMTIME) {
2088                         attr->cat_mtime = oa->o_mtime;
2089                         valid |= CAT_MTIME;
2090                 }
2091                 if (oa->o_valid & OBD_MD_FLATIME) {
2092                         attr->cat_atime = oa->o_atime;
2093                         valid |= CAT_ATIME;
2094                 }
2095                 if (oa->o_valid & OBD_MD_FLCTIME) {
2096                         attr->cat_ctime = oa->o_ctime;
2097                         valid |= CAT_CTIME;
2098                 }
2099
2100                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2101                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2102                         loff_t last_off = last->oap_count + last->oap_obj_off +
2103                                 last->oap_page_off;
2104
2105                         /* Change file size if this is an out of quota or
2106                          * direct IO write and it extends the file size */
2107                         if (loi->loi_lvb.lvb_size < last_off) {
2108                                 attr->cat_size = last_off;
2109                                 valid |= CAT_SIZE;
2110                         }
2111                         /* Extend KMS if it's not a lockless write */
2112                         if (loi->loi_kms < last_off &&
2113                             oap2osc_page(last)->ops_srvlock == 0) {
2114                                 attr->cat_kms = last_off;
2115                                 valid |= CAT_KMS;
2116                         }
2117                 }
2118
2119                 if (valid != 0)
2120                         cl_object_attr_update(env, obj, attr, valid);
2121                 cl_object_attr_unlock(obj);
2122         }
2123         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2124
2125         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2126                 osc_inc_unstable_pages(req);
2127
2128         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2129                 list_del_init(&ext->oe_link);
2130                 osc_extent_finish(env, ext, 1,
2131                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2132         }
2133         LASSERT(list_empty(&aa->aa_exts));
2134         LASSERT(list_empty(&aa->aa_oaps));
2135
2136         transferred = (req->rq_bulk == NULL ? /* short io */
2137                        aa->aa_requested_nob :
2138                        req->rq_bulk->bd_nob_transferred);
2139
2140         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2141         ptlrpc_lprocfs_brw(req, transferred);
2142
2143         spin_lock(&cli->cl_loi_list_lock);
2144         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2145          * is called so we know whether to go to sync BRWs or wait for more
2146          * RPCs to complete */
2147         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2148                 cli->cl_w_in_flight--;
2149         else
2150                 cli->cl_r_in_flight--;
2151         osc_wake_cache_waiters(cli);
2152         spin_unlock(&cli->cl_loi_list_lock);
2153
2154         osc_io_unplug(env, cli, NULL);
2155         RETURN(rc);
2156 }
2157
2158 static void brw_commit(struct ptlrpc_request *req)
2159 {
2160         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2161          * this called via the rq_commit_cb, I need to ensure
2162          * osc_dec_unstable_pages is still called. Otherwise unstable
2163          * pages may be leaked. */
2164         spin_lock(&req->rq_lock);
2165         if (likely(req->rq_unstable)) {
2166                 req->rq_unstable = 0;
2167                 spin_unlock(&req->rq_lock);
2168
2169                 osc_dec_unstable_pages(req);
2170         } else {
2171                 req->rq_committed = 1;
2172                 spin_unlock(&req->rq_lock);
2173         }
2174 }
2175
2176 /**
2177  * Build an RPC by the list of extent @ext_list. The caller must ensure
2178  * that the total pages in this list are NOT over max pages per RPC.
2179  * Extents in the list must be in OES_RPC state.
2180  */
2181 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2182                   struct list_head *ext_list, int cmd)
2183 {
2184         struct ptlrpc_request           *req = NULL;
2185         struct osc_extent               *ext;
2186         struct brw_page                 **pga = NULL;
2187         struct osc_brw_async_args       *aa = NULL;
2188         struct obdo                     *oa = NULL;
2189         struct osc_async_page           *oap;
2190         struct osc_object               *obj = NULL;
2191         struct cl_req_attr              *crattr = NULL;
2192         loff_t                          starting_offset = OBD_OBJECT_EOF;
2193         loff_t                          ending_offset = 0;
2194         int                             mpflag = 0;
2195         int                             mem_tight = 0;
2196         int                             page_count = 0;
2197         bool                            soft_sync = false;
2198         bool                            interrupted = false;
2199         bool                            ndelay = false;
2200         int                             i;
2201         int                             grant = 0;
2202         int                             rc;
2203         __u32                           layout_version = 0;
2204         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2205         struct ost_body                 *body;
2206         ENTRY;
2207         LASSERT(!list_empty(ext_list));
2208
2209         /* add pages into rpc_list to build BRW rpc */
2210         list_for_each_entry(ext, ext_list, oe_link) {
2211                 LASSERT(ext->oe_state == OES_RPC);
2212                 mem_tight |= ext->oe_memalloc;
2213                 grant += ext->oe_grants;
2214                 page_count += ext->oe_nr_pages;
2215                 layout_version = MAX(layout_version, ext->oe_layout_version);
2216                 if (obj == NULL)
2217                         obj = ext->oe_obj;
2218         }
2219
2220         soft_sync = osc_over_unstable_soft_limit(cli);
2221         if (mem_tight)
2222                 mpflag = cfs_memory_pressure_get_and_set();
2223
2224         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2225         if (pga == NULL)
2226                 GOTO(out, rc = -ENOMEM);
2227
2228         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2229         if (oa == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         i = 0;
2233         list_for_each_entry(ext, ext_list, oe_link) {
2234                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2235                         if (mem_tight)
2236                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2237                         if (soft_sync)
2238                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2239                         pga[i] = &oap->oap_brw_page;
2240                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2241                         i++;
2242
2243                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2244                         if (starting_offset == OBD_OBJECT_EOF ||
2245                             starting_offset > oap->oap_obj_off)
2246                                 starting_offset = oap->oap_obj_off;
2247                         else
2248                                 LASSERT(oap->oap_page_off == 0);
2249                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2250                                 ending_offset = oap->oap_obj_off +
2251                                                 oap->oap_count;
2252                         else
2253                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2254                                         PAGE_SIZE);
2255                         if (oap->oap_interrupted)
2256                                 interrupted = true;
2257                 }
2258                 if (ext->oe_ndelay)
2259                         ndelay = true;
2260         }
2261
2262         /* first page in the list */
2263         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2264
2265         crattr = &osc_env_info(env)->oti_req_attr;
2266         memset(crattr, 0, sizeof(*crattr));
2267         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2268         crattr->cra_flags = ~0ULL;
2269         crattr->cra_page = oap2cl_page(oap);
2270         crattr->cra_oa = oa;
2271         cl_req_attr_set(env, osc2cl(obj), crattr);
2272
2273         if (cmd == OBD_BRW_WRITE) {
2274                 oa->o_grant_used = grant;
2275                 if (layout_version > 0) {
2276                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2277                                PFID(&oa->o_oi.oi_fid), layout_version);
2278
2279                         oa->o_layout_version = layout_version;
2280                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2281                 }
2282         }
2283
2284         sort_brw_pages(pga, page_count);
2285         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2286         if (rc != 0) {
2287                 CERROR("prep_req failed: %d\n", rc);
2288                 GOTO(out, rc);
2289         }
2290
2291         req->rq_commit_cb = brw_commit;
2292         req->rq_interpret_reply = brw_interpret;
2293         req->rq_memalloc = mem_tight != 0;
2294         oap->oap_request = ptlrpc_request_addref(req);
2295         if (interrupted && !req->rq_intr)
2296                 ptlrpc_mark_interrupted(req);
2297         if (ndelay) {
2298                 req->rq_no_resend = req->rq_no_delay = 1;
2299                 /* probably set a shorter timeout value.
2300                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2301                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2302         }
2303
2304         /* Need to update the timestamps after the request is built in case
2305          * we race with setattr (locally or in queue at OST).  If OST gets
2306          * later setattr before earlier BRW (as determined by the request xid),
2307          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2308          * way to do this in a single call.  bug 10150 */
2309         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2310         crattr->cra_oa = &body->oa;
2311         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2312         cl_req_attr_set(env, osc2cl(obj), crattr);
2313         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2314
2315         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2316         aa = ptlrpc_req_async_args(req);
2317         INIT_LIST_HEAD(&aa->aa_oaps);
2318         list_splice_init(&rpc_list, &aa->aa_oaps);
2319         INIT_LIST_HEAD(&aa->aa_exts);
2320         list_splice_init(ext_list, &aa->aa_exts);
2321
2322         spin_lock(&cli->cl_loi_list_lock);
2323         starting_offset >>= PAGE_SHIFT;
2324         if (cmd == OBD_BRW_READ) {
2325                 cli->cl_r_in_flight++;
2326                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329                                       starting_offset + 1);
2330         } else {
2331                 cli->cl_w_in_flight++;
2332                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2333                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2334                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335                                       starting_offset + 1);
2336         }
2337         spin_unlock(&cli->cl_loi_list_lock);
2338
2339         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2340                   page_count, aa, cli->cl_r_in_flight,
2341                   cli->cl_w_in_flight);
2342         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2343
2344         ptlrpcd_add_req(req);
2345         rc = 0;
2346         EXIT;
2347
2348 out:
2349         if (mem_tight != 0)
2350                 cfs_memory_pressure_restore(mpflag);
2351
2352         if (rc != 0) {
2353                 LASSERT(req == NULL);
2354
2355                 if (oa)
2356                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2357                 if (pga)
2358                         OBD_FREE(pga, sizeof(*pga) * page_count);
2359                 /* this should happen rarely and is pretty bad, it makes the
2360                  * pending list not follow the dirty order */
2361                 while (!list_empty(ext_list)) {
2362                         ext = list_entry(ext_list->next, struct osc_extent,
2363                                          oe_link);
2364                         list_del_init(&ext->oe_link);
2365                         osc_extent_finish(env, ext, 0, rc);
2366                 }
2367         }
2368         RETURN(rc);
2369 }
2370
2371 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2372 {
2373         int set = 0;
2374
2375         LASSERT(lock != NULL);
2376
2377         lock_res_and_lock(lock);
2378
2379         if (lock->l_ast_data == NULL)
2380                 lock->l_ast_data = data;
2381         if (lock->l_ast_data == data)
2382                 set = 1;
2383
2384         unlock_res_and_lock(lock);
2385
2386         return set;
2387 }
2388
2389 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2390                      void *cookie, struct lustre_handle *lockh,
2391                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2392                      int errcode)
2393 {
2394         bool intent = *flags & LDLM_FL_HAS_INTENT;
2395         int rc;
2396         ENTRY;
2397
2398         /* The request was created before ldlm_cli_enqueue call. */
2399         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2400                 struct ldlm_reply *rep;
2401
2402                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2403                 LASSERT(rep != NULL);
2404
2405                 rep->lock_policy_res1 =
2406                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2407                 if (rep->lock_policy_res1)
2408                         errcode = rep->lock_policy_res1;
2409                 if (!speculative)
2410                         *flags |= LDLM_FL_LVB_READY;
2411         } else if (errcode == ELDLM_OK) {
2412                 *flags |= LDLM_FL_LVB_READY;
2413         }
2414
2415         /* Call the update callback. */
2416         rc = (*upcall)(cookie, lockh, errcode);
2417
2418         /* release the reference taken in ldlm_cli_enqueue() */
2419         if (errcode == ELDLM_LOCK_MATCHED)
2420                 errcode = ELDLM_OK;
2421         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2422                 ldlm_lock_decref(lockh, mode);
2423
2424         RETURN(rc);
2425 }
2426
2427 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2428                           void *args, int rc)
2429 {
2430         struct osc_enqueue_args *aa = args;
2431         struct ldlm_lock *lock;
2432         struct lustre_handle *lockh = &aa->oa_lockh;
2433         enum ldlm_mode mode = aa->oa_mode;
2434         struct ost_lvb *lvb = aa->oa_lvb;
2435         __u32 lvb_len = sizeof(*lvb);
2436         __u64 flags = 0;
2437
2438         ENTRY;
2439
2440         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2441          * be valid. */
2442         lock = ldlm_handle2lock(lockh);
2443         LASSERTF(lock != NULL,
2444                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2445                  lockh->cookie, req, aa);
2446
2447         /* Take an additional reference so that a blocking AST that
2448          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2449          * to arrive after an upcall has been executed by
2450          * osc_enqueue_fini(). */
2451         ldlm_lock_addref(lockh, mode);
2452
2453         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2454         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2455
2456         /* Let CP AST to grant the lock first. */
2457         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2458
2459         if (aa->oa_speculative) {
2460                 LASSERT(aa->oa_lvb == NULL);
2461                 LASSERT(aa->oa_flags == NULL);
2462                 aa->oa_flags = &flags;
2463         }
2464
2465         /* Complete obtaining the lock procedure. */
2466         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2467                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2468                                    lockh, rc);
2469         /* Complete osc stuff. */
2470         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2471                               aa->oa_flags, aa->oa_speculative, rc);
2472
2473         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2474
2475         ldlm_lock_decref(lockh, mode);
2476         LDLM_LOCK_PUT(lock);
2477         RETURN(rc);
2478 }
2479
2480 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2481
2482 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2483  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2484  * other synchronous requests, however keeping some locks and trying to obtain
2485  * others may take a considerable amount of time in a case of ost failure; and
2486  * when other sync requests do not get released lock from a client, the client
2487  * is evicted from the cluster -- such scenarious make the life difficult, so
2488  * release locks just after they are obtained. */
2489 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2490                      __u64 *flags, union ldlm_policy_data *policy,
2491                      struct ost_lvb *lvb, int kms_valid,
2492                      osc_enqueue_upcall_f upcall, void *cookie,
2493                      struct ldlm_enqueue_info *einfo,
2494                      struct ptlrpc_request_set *rqset, int async,
2495                      bool speculative)
2496 {
2497         struct obd_device *obd = exp->exp_obd;
2498         struct lustre_handle lockh = { 0 };
2499         struct ptlrpc_request *req = NULL;
2500         int intent = *flags & LDLM_FL_HAS_INTENT;
2501         __u64 match_flags = *flags;
2502         enum ldlm_mode mode;
2503         int rc;
2504         ENTRY;
2505
2506         /* Filesystem lock extents are extended to page boundaries so that
2507          * dealing with the page cache is a little smoother.  */
2508         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2509         policy->l_extent.end |= ~PAGE_MASK;
2510
2511         /*
2512          * kms is not valid when either object is completely fresh (so that no
2513          * locks are cached), or object was evicted. In the latter case cached
2514          * lock cannot be used, because it would prime inode state with
2515          * potentially stale LVB.
2516          */
2517         if (!kms_valid)
2518                 goto no_match;
2519
2520         /* Next, search for already existing extent locks that will cover us */
2521         /* If we're trying to read, we also search for an existing PW lock.  The
2522          * VFS and page cache already protect us locally, so lots of readers/
2523          * writers can share a single PW lock.
2524          *
2525          * There are problems with conversion deadlocks, so instead of
2526          * converting a read lock to a write lock, we'll just enqueue a new
2527          * one.
2528          *
2529          * At some point we should cancel the read lock instead of making them
2530          * send us a blocking callback, but there are problems with canceling
2531          * locks out from other users right now, too. */
2532         mode = einfo->ei_mode;
2533         if (einfo->ei_mode == LCK_PR)
2534                 mode |= LCK_PW;
2535         /* Normal lock requests must wait for the LVB to be ready before
2536          * matching a lock; speculative lock requests do not need to,
2537          * because they will not actually use the lock. */
2538         if (!speculative)
2539                 match_flags |= LDLM_FL_LVB_READY;
2540         if (intent != 0)
2541                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2542         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2543                                einfo->ei_type, policy, mode, &lockh, 0);
2544         if (mode) {
2545                 struct ldlm_lock *matched;
2546
2547                 if (*flags & LDLM_FL_TEST_LOCK)
2548                         RETURN(ELDLM_OK);
2549
2550                 matched = ldlm_handle2lock(&lockh);
2551                 if (speculative) {
2552                         /* This DLM lock request is speculative, and does not
2553                          * have an associated IO request. Therefore if there
2554                          * is already a DLM lock, it wll just inform the
2555                          * caller to cancel the request for this stripe.*/
2556                         lock_res_and_lock(matched);
2557                         if (ldlm_extent_equal(&policy->l_extent,
2558                             &matched->l_policy_data.l_extent))
2559                                 rc = -EEXIST;
2560                         else
2561                                 rc = -ECANCELED;
2562                         unlock_res_and_lock(matched);
2563
2564                         ldlm_lock_decref(&lockh, mode);
2565                         LDLM_LOCK_PUT(matched);
2566                         RETURN(rc);
2567                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2568                         *flags |= LDLM_FL_LVB_READY;
2569
2570                         /* We already have a lock, and it's referenced. */
2571                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2572
2573                         ldlm_lock_decref(&lockh, mode);
2574                         LDLM_LOCK_PUT(matched);
2575                         RETURN(ELDLM_OK);
2576                 } else {
2577                         ldlm_lock_decref(&lockh, mode);
2578                         LDLM_LOCK_PUT(matched);
2579                 }
2580         }
2581
2582 no_match:
2583         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2584                 RETURN(-ENOLCK);
2585
2586         if (intent) {
2587                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2588                                            &RQF_LDLM_ENQUEUE_LVB);
2589                 if (req == NULL)
2590                         RETURN(-ENOMEM);
2591
2592                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2593                 if (rc) {
2594                         ptlrpc_request_free(req);
2595                         RETURN(rc);
2596                 }
2597
2598                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2599                                      sizeof *lvb);
2600                 ptlrpc_request_set_replen(req);
2601         }
2602
2603         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2604         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2605
2606         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2607                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2608         if (async) {
2609                 if (!rc) {
2610                         struct osc_enqueue_args *aa;
2611                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2612                         aa = ptlrpc_req_async_args(req);
2613                         aa->oa_exp         = exp;
2614                         aa->oa_mode        = einfo->ei_mode;
2615                         aa->oa_type        = einfo->ei_type;
2616                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2617                         aa->oa_upcall      = upcall;
2618                         aa->oa_cookie      = cookie;
2619                         aa->oa_speculative = speculative;
2620                         if (!speculative) {
2621                                 aa->oa_flags  = flags;
2622                                 aa->oa_lvb    = lvb;
2623                         } else {
2624                                 /* speculative locks are essentially to enqueue
2625                                  * a DLM lock  in advance, so we don't care
2626                                  * about the result of the enqueue. */
2627                                 aa->oa_lvb    = NULL;
2628                                 aa->oa_flags  = NULL;
2629                         }
2630
2631                         req->rq_interpret_reply = osc_enqueue_interpret;
2632                         if (rqset == PTLRPCD_SET)
2633                                 ptlrpcd_add_req(req);
2634                         else
2635                                 ptlrpc_set_add_req(rqset, req);
2636                 } else if (intent) {
2637                         ptlrpc_req_finished(req);
2638                 }
2639                 RETURN(rc);
2640         }
2641
2642         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2643                               flags, speculative, rc);
2644         if (intent)
2645                 ptlrpc_req_finished(req);
2646
2647         RETURN(rc);
2648 }
2649
2650 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2651                    enum ldlm_type type, union ldlm_policy_data *policy,
2652                    enum ldlm_mode mode, __u64 *flags, void *data,
2653                    struct lustre_handle *lockh, int unref)
2654 {
2655         struct obd_device *obd = exp->exp_obd;
2656         __u64 lflags = *flags;
2657         enum ldlm_mode rc;
2658         ENTRY;
2659
2660         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2661                 RETURN(-EIO);
2662
2663         /* Filesystem lock extents are extended to page boundaries so that
2664          * dealing with the page cache is a little smoother */
2665         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2666         policy->l_extent.end |= ~PAGE_MASK;
2667
2668         /* Next, search for already existing extent locks that will cover us */
2669         /* If we're trying to read, we also search for an existing PW lock.  The
2670          * VFS and page cache already protect us locally, so lots of readers/
2671          * writers can share a single PW lock. */
2672         rc = mode;
2673         if (mode == LCK_PR)
2674                 rc |= LCK_PW;
2675         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2676                              res_id, type, policy, rc, lockh, unref);
2677         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2678                 RETURN(rc);
2679
2680         if (data != NULL) {
2681                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2682
2683                 LASSERT(lock != NULL);
2684                 if (!osc_set_lock_data(lock, data)) {
2685                         ldlm_lock_decref(lockh, rc);
2686                         rc = 0;
2687                 }
2688                 LDLM_LOCK_PUT(lock);
2689         }
2690         RETURN(rc);
2691 }
2692
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694                                 struct ptlrpc_request *req, void *args, int rc)
2695 {
2696         struct osc_async_args *aa = args;
2697         struct obd_statfs *msfs;
2698
2699         ENTRY;
2700         if (rc == -EBADR)
2701                 /*
2702                  * The request has in fact never been sent due to issues at
2703                  * a higher level (LOV).  Exit immediately since the caller
2704                  * is aware of the problem and takes care of the clean up.
2705                  */
2706                 RETURN(rc);
2707
2708         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2709             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2710                 GOTO(out, rc = 0);
2711
2712         if (rc != 0)
2713                 GOTO(out, rc);
2714
2715         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2716         if (msfs == NULL)
2717                 GOTO(out, rc = -EPROTO);
2718
2719         *aa->aa_oi->oi_osfs = *msfs;
2720 out:
2721         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2722
2723         RETURN(rc);
2724 }
2725
2726 static int osc_statfs_async(struct obd_export *exp,
2727                             struct obd_info *oinfo, time64_t max_age,
2728                             struct ptlrpc_request_set *rqset)
2729 {
2730         struct obd_device     *obd = class_exp2obd(exp);
2731         struct ptlrpc_request *req;
2732         struct osc_async_args *aa;
2733         int rc;
2734         ENTRY;
2735
2736         if (obd->obd_osfs_age >= max_age) {
2737                 CDEBUG(D_SUPER,
2738                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2739                        obd->obd_name, &obd->obd_osfs,
2740                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2741                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2742                 spin_lock(&obd->obd_osfs_lock);
2743                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2744                 spin_unlock(&obd->obd_osfs_lock);
2745                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2746                 if (oinfo->oi_cb_up)
2747                         oinfo->oi_cb_up(oinfo, 0);
2748
2749                 RETURN(0);
2750         }
2751
2752         /* We could possibly pass max_age in the request (as an absolute
2753          * timestamp or a "seconds.usec ago") so the target can avoid doing
2754          * extra calls into the filesystem if that isn't necessary (e.g.
2755          * during mount that would help a bit).  Having relative timestamps
2756          * is not so great if request processing is slow, while absolute
2757          * timestamps are not ideal because they need time synchronization. */
2758         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2759         if (req == NULL)
2760                 RETURN(-ENOMEM);
2761
2762         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2763         if (rc) {
2764                 ptlrpc_request_free(req);
2765                 RETURN(rc);
2766         }
2767         ptlrpc_request_set_replen(req);
2768         req->rq_request_portal = OST_CREATE_PORTAL;
2769         ptlrpc_at_set_req_timeout(req);
2770
2771         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2772                 /* procfs requests not want stat in wait for avoid deadlock */
2773                 req->rq_no_resend = 1;
2774                 req->rq_no_delay = 1;
2775         }
2776
2777         req->rq_interpret_reply = osc_statfs_interpret;
2778         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2779         aa = ptlrpc_req_async_args(req);
2780         aa->aa_oi = oinfo;
2781
2782         ptlrpc_set_add_req(rqset, req);
2783         RETURN(0);
2784 }
2785
2786 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2787                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2788 {
2789         struct obd_device     *obd = class_exp2obd(exp);
2790         struct obd_statfs     *msfs;
2791         struct ptlrpc_request *req;
2792         struct obd_import     *imp = NULL;
2793         int rc;
2794         ENTRY;
2795
2796
2797         /*Since the request might also come from lprocfs, so we need
2798          *sync this with client_disconnect_export Bug15684*/
2799         down_read(&obd->u.cli.cl_sem);
2800         if (obd->u.cli.cl_import)
2801                 imp = class_import_get(obd->u.cli.cl_import);
2802         up_read(&obd->u.cli.cl_sem);
2803         if (!imp)
2804                 RETURN(-ENODEV);
2805
2806         /* We could possibly pass max_age in the request (as an absolute
2807          * timestamp or a "seconds.usec ago") so the target can avoid doing
2808          * extra calls into the filesystem if that isn't necessary (e.g.
2809          * during mount that would help a bit).  Having relative timestamps
2810          * is not so great if request processing is slow, while absolute
2811          * timestamps are not ideal because they need time synchronization. */
2812         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2813
2814         class_import_put(imp);
2815
2816         if (req == NULL)
2817                 RETURN(-ENOMEM);
2818
2819         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2820         if (rc) {
2821                 ptlrpc_request_free(req);
2822                 RETURN(rc);
2823         }
2824         ptlrpc_request_set_replen(req);
2825         req->rq_request_portal = OST_CREATE_PORTAL;
2826         ptlrpc_at_set_req_timeout(req);
2827
2828         if (flags & OBD_STATFS_NODELAY) {
2829                 /* procfs requests not want stat in wait for avoid deadlock */
2830                 req->rq_no_resend = 1;
2831                 req->rq_no_delay = 1;
2832         }
2833
2834         rc = ptlrpc_queue_wait(req);
2835         if (rc)
2836                 GOTO(out, rc);
2837
2838         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2839         if (msfs == NULL)
2840                 GOTO(out, rc = -EPROTO);
2841
2842         *osfs = *msfs;
2843
2844         EXIT;
2845 out:
2846         ptlrpc_req_finished(req);
2847         return rc;
2848 }
2849
2850 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2851                          void *karg, void __user *uarg)
2852 {
2853         struct obd_device *obd = exp->exp_obd;
2854         struct obd_ioctl_data *data = karg;
2855         int rc = 0;
2856
2857         ENTRY;
2858         if (!try_module_get(THIS_MODULE)) {
2859                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2860                        module_name(THIS_MODULE));
2861                 return -EINVAL;
2862         }
2863         switch (cmd) {
2864         case OBD_IOC_CLIENT_RECOVER:
2865                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2866                                            data->ioc_inlbuf1, 0);
2867                 if (rc > 0)
2868                         rc = 0;
2869                 break;
2870         case IOC_OSC_SET_ACTIVE:
2871                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2872                                               data->ioc_offset);
2873                 break;
2874         default:
2875                 rc = -ENOTTY;
2876                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2877                        obd->obd_name, cmd, current_comm(), rc);
2878                 break;
2879         }
2880
2881         module_put(THIS_MODULE);
2882         return rc;
2883 }
2884
2885 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2886                        u32 keylen, void *key, u32 vallen, void *val,
2887                        struct ptlrpc_request_set *set)
2888 {
2889         struct ptlrpc_request *req;
2890         struct obd_device     *obd = exp->exp_obd;
2891         struct obd_import     *imp = class_exp2cliimp(exp);
2892         char                  *tmp;
2893         int                    rc;
2894         ENTRY;
2895
2896         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2897
2898         if (KEY_IS(KEY_CHECKSUM)) {
2899                 if (vallen != sizeof(int))
2900                         RETURN(-EINVAL);
2901                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2902                 RETURN(0);
2903         }
2904
2905         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2906                 sptlrpc_conf_client_adapt(obd);
2907                 RETURN(0);
2908         }
2909
2910         if (KEY_IS(KEY_FLUSH_CTX)) {
2911                 sptlrpc_import_flush_my_ctx(imp);
2912                 RETURN(0);
2913         }
2914
2915         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2916                 struct client_obd *cli = &obd->u.cli;
2917                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2918                 long target = *(long *)val;
2919
2920                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2921                 *(long *)val -= nr;
2922                 RETURN(0);
2923         }
2924
2925         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2926                 RETURN(-EINVAL);
2927
2928         /* We pass all other commands directly to OST. Since nobody calls osc
2929            methods directly and everybody is supposed to go through LOV, we
2930            assume lov checked invalid values for us.
2931            The only recognised values so far are evict_by_nid and mds_conn.
2932            Even if something bad goes through, we'd get a -EINVAL from OST
2933            anyway. */
2934
2935         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2936                                                 &RQF_OST_SET_GRANT_INFO :
2937                                                 &RQF_OBD_SET_INFO);
2938         if (req == NULL)
2939                 RETURN(-ENOMEM);
2940
2941         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2942                              RCL_CLIENT, keylen);
2943         if (!KEY_IS(KEY_GRANT_SHRINK))
2944                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2945                                      RCL_CLIENT, vallen);
2946         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2947         if (rc) {
2948                 ptlrpc_request_free(req);
2949                 RETURN(rc);
2950         }
2951
2952         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2953         memcpy(tmp, key, keylen);
2954         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2955                                                         &RMF_OST_BODY :
2956                                                         &RMF_SETINFO_VAL);
2957         memcpy(tmp, val, vallen);
2958
2959         if (KEY_IS(KEY_GRANT_SHRINK)) {
2960                 struct osc_grant_args *aa;
2961                 struct obdo *oa;
2962
2963                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2964                 aa = ptlrpc_req_async_args(req);
2965                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2966                 if (!oa) {
2967                         ptlrpc_req_finished(req);
2968                         RETURN(-ENOMEM);
2969                 }
2970                 *oa = ((struct ost_body *)val)->oa;
2971                 aa->aa_oa = oa;
2972                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2973         }
2974
2975         ptlrpc_request_set_replen(req);
2976         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2977                 LASSERT(set != NULL);
2978                 ptlrpc_set_add_req(set, req);
2979                 ptlrpc_check_set(NULL, set);
2980         } else {
2981                 ptlrpcd_add_req(req);
2982         }
2983
2984         RETURN(0);
2985 }
2986 EXPORT_SYMBOL(osc_set_info_async);
2987
2988 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2989                   struct obd_device *obd, struct obd_uuid *cluuid,
2990                   struct obd_connect_data *data, void *localdata)
2991 {
2992         struct client_obd *cli = &obd->u.cli;
2993
2994         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2995                 long lost_grant;
2996                 long grant;
2997
2998                 spin_lock(&cli->cl_loi_list_lock);
2999                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3000                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3001                         /* restore ocd_grant_blkbits as client page bits */
3002                         data->ocd_grant_blkbits = PAGE_SHIFT;
3003                         grant += cli->cl_dirty_grant;
3004                 } else {
3005                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3006                 }
3007                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3008                 lost_grant = cli->cl_lost_grant;
3009                 cli->cl_lost_grant = 0;
3010                 spin_unlock(&cli->cl_loi_list_lock);
3011
3012                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3013                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3014                        data->ocd_version, data->ocd_grant, lost_grant);
3015         }
3016
3017         RETURN(0);
3018 }
3019 EXPORT_SYMBOL(osc_reconnect);
3020
3021 int osc_disconnect(struct obd_export *exp)
3022 {
3023         struct obd_device *obd = class_exp2obd(exp);
3024         int rc;
3025
3026         rc = client_disconnect_export(exp);
3027         /**
3028          * Initially we put del_shrink_grant before disconnect_export, but it
3029          * causes the following problem if setup (connect) and cleanup
3030          * (disconnect) are tangled together.
3031          *      connect p1                     disconnect p2
3032          *   ptlrpc_connect_import
3033          *     ...............               class_manual_cleanup
3034          *                                     osc_disconnect
3035          *                                     del_shrink_grant
3036          *   ptlrpc_connect_interrupt
3037          *     osc_init_grant
3038          *   add this client to shrink list
3039          *                                      cleanup_osc
3040          * Bang! grant shrink thread trigger the shrink. BUG18662
3041          */
3042         osc_del_grant_list(&obd->u.cli);
3043         return rc;
3044 }
3045 EXPORT_SYMBOL(osc_disconnect);
3046
3047 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3048                                  struct hlist_node *hnode, void *arg)
3049 {
3050         struct lu_env *env = arg;
3051         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3052         struct ldlm_lock *lock;
3053         struct osc_object *osc = NULL;
3054         ENTRY;
3055
3056         lock_res(res);
3057         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3058                 if (lock->l_ast_data != NULL && osc == NULL) {
3059                         osc = lock->l_ast_data;
3060                         cl_object_get(osc2cl(osc));
3061                 }
3062
3063                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3064                  * by the 2nd round of ldlm_namespace_clean() call in
3065                  * osc_import_event(). */
3066                 ldlm_clear_cleaned(lock);
3067         }
3068         unlock_res(res);
3069
3070         if (osc != NULL) {
3071                 osc_object_invalidate(env, osc);
3072                 cl_object_put(env, osc2cl(osc));
3073         }
3074
3075         RETURN(0);
3076 }
3077 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3078
3079 static int osc_import_event(struct obd_device *obd,
3080                             struct obd_import *imp,
3081                             enum obd_import_event event)
3082 {
3083         struct client_obd *cli;
3084         int rc = 0;
3085
3086         ENTRY;
3087         LASSERT(imp->imp_obd == obd);
3088
3089         switch (event) {
3090         case IMP_EVENT_DISCON: {
3091                 cli = &obd->u.cli;
3092                 spin_lock(&cli->cl_loi_list_lock);
3093                 cli->cl_avail_grant = 0;
3094                 cli->cl_lost_grant = 0;
3095                 spin_unlock(&cli->cl_loi_list_lock);
3096                 break;
3097         }
3098         case IMP_EVENT_INACTIVE: {
3099                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3100                 break;
3101         }
3102         case IMP_EVENT_INVALIDATE: {
3103                 struct ldlm_namespace *ns = obd->obd_namespace;
3104                 struct lu_env         *env;
3105                 __u16                  refcheck;
3106
3107                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3108
3109                 env = cl_env_get(&refcheck);
3110                 if (!IS_ERR(env)) {
3111                         osc_io_unplug(env, &obd->u.cli, NULL);
3112
3113                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3114                                                  osc_ldlm_resource_invalidate,
3115                                                  env, 0);
3116                         cl_env_put(env, &refcheck);
3117
3118                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3119                 } else
3120                         rc = PTR_ERR(env);
3121                 break;
3122         }
3123         case IMP_EVENT_ACTIVE: {
3124                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3125                 break;
3126         }
3127         case IMP_EVENT_OCD: {
3128                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3129
3130                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3131                         osc_init_grant(&obd->u.cli, ocd);
3132
3133                 /* See bug 7198 */
3134                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3135                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3136
3137                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3138                 break;
3139         }
3140         case IMP_EVENT_DEACTIVATE: {
3141                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3142                 break;
3143         }
3144         case IMP_EVENT_ACTIVATE: {
3145                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3146                 break;
3147         }
3148         default:
3149                 CERROR("Unknown import event %d\n", event);
3150                 LBUG();
3151         }
3152         RETURN(rc);
3153 }
3154
3155 /**
3156  * Determine whether the lock can be canceled before replaying the lock
3157  * during recovery, see bug16774 for detailed information.
3158  *
3159  * \retval zero the lock can't be canceled
3160  * \retval other ok to cancel
3161  */
3162 static int osc_cancel_weight(struct ldlm_lock *lock)
3163 {
3164         /*
3165          * Cancel all unused and granted extent lock.
3166          */
3167         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3168             ldlm_is_granted(lock) &&
3169             osc_ldlm_weigh_ast(lock) == 0)
3170                 RETURN(1);
3171
3172         RETURN(0);
3173 }
3174
3175 static int brw_queue_work(const struct lu_env *env, void *data)
3176 {
3177         struct client_obd *cli = data;
3178
3179         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3180
3181         osc_io_unplug(env, cli, NULL);
3182         RETURN(0);
3183 }
3184
3185 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3186 {
3187         struct client_obd *cli = &obd->u.cli;
3188         void *handler;
3189         int rc;
3190
3191         ENTRY;
3192
3193         rc = ptlrpcd_addref();
3194         if (rc)
3195                 RETURN(rc);
3196
3197         rc = client_obd_setup(obd, lcfg);
3198         if (rc)
3199                 GOTO(out_ptlrpcd, rc);
3200
3201
3202         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3203         if (IS_ERR(handler))
3204                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205         cli->cl_writeback_work = handler;
3206
3207         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3208         if (IS_ERR(handler))
3209                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3210         cli->cl_lru_work = handler;
3211
3212         rc = osc_quota_setup(obd);
3213         if (rc)
3214                 GOTO(out_ptlrpcd_work, rc);
3215
3216         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3217         osc_update_next_shrink(cli);
3218
3219         RETURN(rc);
3220
3221 out_ptlrpcd_work:
3222         if (cli->cl_writeback_work != NULL) {
3223                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3224                 cli->cl_writeback_work = NULL;
3225         }
3226         if (cli->cl_lru_work != NULL) {
3227                 ptlrpcd_destroy_work(cli->cl_lru_work);
3228                 cli->cl_lru_work = NULL;
3229         }
3230         client_obd_cleanup(obd);
3231 out_ptlrpcd:
3232         ptlrpcd_decref();
3233         RETURN(rc);
3234 }
3235 EXPORT_SYMBOL(osc_setup_common);
3236
3237 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3238 {
3239         struct client_obd *cli = &obd->u.cli;
3240         int                adding;
3241         int                added;
3242         int                req_count;
3243         int                rc;
3244
3245         ENTRY;
3246
3247         rc = osc_setup_common(obd, lcfg);
3248         if (rc < 0)
3249                 RETURN(rc);
3250
3251         rc = osc_tunables_init(obd);
3252         if (rc)
3253                 RETURN(rc);
3254
3255         /*
3256          * We try to control the total number of requests with a upper limit
3257          * osc_reqpool_maxreqcount. There might be some race which will cause
3258          * over-limit allocation, but it is fine.
3259          */
3260         req_count = atomic_read(&osc_pool_req_count);
3261         if (req_count < osc_reqpool_maxreqcount) {
3262                 adding = cli->cl_max_rpcs_in_flight + 2;
3263                 if (req_count + adding > osc_reqpool_maxreqcount)
3264                         adding = osc_reqpool_maxreqcount - req_count;
3265
3266                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3267                 atomic_add(added, &osc_pool_req_count);
3268         }
3269
3270         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3271
3272         spin_lock(&osc_shrink_lock);
3273         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3274         spin_unlock(&osc_shrink_lock);
3275         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3276         cli->cl_import->imp_idle_debug = D_HA;
3277
3278         RETURN(0);
3279 }
3280
3281 int osc_precleanup_common(struct obd_device *obd)
3282 {
3283         struct client_obd *cli = &obd->u.cli;
3284         ENTRY;
3285
3286         /* LU-464
3287          * for echo client, export may be on zombie list, wait for
3288          * zombie thread to cull it, because cli.cl_import will be
3289          * cleared in client_disconnect_export():
3290          *   class_export_destroy() -> obd_cleanup() ->
3291          *   echo_device_free() -> echo_client_cleanup() ->
3292          *   obd_disconnect() -> osc_disconnect() ->
3293          *   client_disconnect_export()
3294          */
3295         obd_zombie_barrier();
3296         if (cli->cl_writeback_work) {
3297                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3298                 cli->cl_writeback_work = NULL;
3299         }
3300
3301         if (cli->cl_lru_work) {
3302                 ptlrpcd_destroy_work(cli->cl_lru_work);
3303                 cli->cl_lru_work = NULL;
3304         }
3305
3306         obd_cleanup_client_import(obd);
3307         RETURN(0);
3308 }
3309 EXPORT_SYMBOL(osc_precleanup_common);
3310
3311 static int osc_precleanup(struct obd_device *obd)
3312 {
3313         ENTRY;
3314
3315         osc_precleanup_common(obd);
3316
3317         ptlrpc_lprocfs_unregister_obd(obd);
3318         RETURN(0);
3319 }
3320
3321 int osc_cleanup_common(struct obd_device *obd)
3322 {
3323         struct client_obd *cli = &obd->u.cli;
3324         int rc;
3325
3326         ENTRY;
3327
3328         spin_lock(&osc_shrink_lock);
3329         list_del(&cli->cl_shrink_list);
3330         spin_unlock(&osc_shrink_lock);
3331
3332         /* lru cleanup */
3333         if (cli->cl_cache != NULL) {
3334                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3335                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3336                 list_del_init(&cli->cl_lru_osc);
3337                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3338                 cli->cl_lru_left = NULL;
3339                 cl_cache_decref(cli->cl_cache);
3340                 cli->cl_cache = NULL;
3341         }
3342
3343         /* free memory of osc quota cache */
3344         osc_quota_cleanup(obd);
3345
3346         rc = client_obd_cleanup(obd);
3347
3348         ptlrpcd_decref();
3349         RETURN(rc);
3350 }
3351 EXPORT_SYMBOL(osc_cleanup_common);
3352
3353 static struct obd_ops osc_obd_ops = {
3354         .o_owner                = THIS_MODULE,
3355         .o_setup                = osc_setup,
3356         .o_precleanup           = osc_precleanup,
3357         .o_cleanup              = osc_cleanup_common,
3358         .o_add_conn             = client_import_add_conn,
3359         .o_del_conn             = client_import_del_conn,
3360         .o_connect              = client_connect_import,
3361         .o_reconnect            = osc_reconnect,
3362         .o_disconnect           = osc_disconnect,
3363         .o_statfs               = osc_statfs,
3364         .o_statfs_async         = osc_statfs_async,
3365         .o_create               = osc_create,
3366         .o_destroy              = osc_destroy,
3367         .o_getattr              = osc_getattr,
3368         .o_setattr              = osc_setattr,
3369         .o_iocontrol            = osc_iocontrol,
3370         .o_set_info_async       = osc_set_info_async,
3371         .o_import_event         = osc_import_event,
3372         .o_quotactl             = osc_quotactl,
3373 };
3374
3375 static struct shrinker *osc_cache_shrinker;
3376 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3377 DEFINE_SPINLOCK(osc_shrink_lock);
3378
3379 #ifndef HAVE_SHRINKER_COUNT
3380 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3381 {
3382         struct shrink_control scv = {
3383                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3384                 .gfp_mask   = shrink_param(sc, gfp_mask)
3385         };
3386 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3387         struct shrinker *shrinker = NULL;
3388 #endif
3389
3390         (void)osc_cache_shrink_scan(shrinker, &scv);
3391
3392         return osc_cache_shrink_count(shrinker, &scv);
3393 }
3394 #endif
3395
3396 static int __init osc_init(void)
3397 {
3398         unsigned int reqpool_size;
3399         unsigned int reqsize;
3400         int rc;
3401         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3402                          osc_cache_shrink_count, osc_cache_shrink_scan);
3403         ENTRY;
3404
3405         /* print an address of _any_ initialized kernel symbol from this
3406          * module, to allow debugging with gdb that doesn't support data
3407          * symbols from modules.*/
3408         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3409
3410         rc = lu_kmem_init(osc_caches);
3411         if (rc)
3412                 RETURN(rc);
3413
3414         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3415                                  LUSTRE_OSC_NAME, &osc_device_type);
3416         if (rc)
3417                 GOTO(out_kmem, rc);
3418
3419         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3420
3421         /* This is obviously too much memory, only prevent overflow here */
3422         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3423                 GOTO(out_type, rc = -EINVAL);
3424
3425         reqpool_size = osc_reqpool_mem_max << 20;
3426
3427         reqsize = 1;
3428         while (reqsize < OST_IO_MAXREQSIZE)
3429                 reqsize = reqsize << 1;
3430
3431         /*
3432          * We don't enlarge the request count in OSC pool according to
3433          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3434          * tried after normal allocation failed. So a small OSC pool won't
3435          * cause much performance degression in most of cases.
3436          */
3437         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3438
3439         atomic_set(&osc_pool_req_count, 0);
3440         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3441                                           ptlrpc_add_rqs_to_pool);
3442
3443         if (osc_rq_pool == NULL)
3444                 GOTO(out_type, rc = -ENOMEM);
3445
3446         rc = osc_start_grant_work();
3447         if (rc != 0)
3448                 GOTO(out_req_pool, rc);
3449
3450         RETURN(rc);
3451
3452 out_req_pool:
3453         ptlrpc_free_rq_pool(osc_rq_pool);
3454 out_type:
3455         class_unregister_type(LUSTRE_OSC_NAME);
3456 out_kmem:
3457         lu_kmem_fini(osc_caches);
3458
3459         RETURN(rc);
3460 }
3461
3462 static void __exit osc_exit(void)
3463 {
3464         osc_stop_grant_work();
3465         remove_shrinker(osc_cache_shrinker);
3466         class_unregister_type(LUSTRE_OSC_NAME);
3467         lu_kmem_fini(osc_caches);
3468         ptlrpc_free_rq_pool(osc_rq_pool);
3469 }
3470
3471 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3472 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3473 MODULE_VERSION(LUSTRE_VERSION_STRING);
3474 MODULE_LICENSE("GPL");
3475
3476 module_init(osc_init);
3477 module_exit(osc_exit);