Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235                 sa = ptlrpc_req_async_args(req);
236                 sa->sa_oa = oa;
237                 sa->sa_upcall = upcall;
238                 sa->sa_cookie = cookie;
239
240                 if (rqset == PTLRPCD_SET)
241                         ptlrpcd_add_req(req);
242                 else
243                         ptlrpc_set_add_req(rqset, req);
244         }
245
246         RETURN(0);
247 }
248
249 static int osc_ladvise_interpret(const struct lu_env *env,
250                                  struct ptlrpc_request *req,
251                                  void *arg, int rc)
252 {
253         struct osc_ladvise_args *la = arg;
254         struct ost_body *body;
255         ENTRY;
256
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261         if (body == NULL)
262                 GOTO(out, rc = -EPROTO);
263
264         *la->la_oa = body->oa;
265 out:
266         rc = la->la_upcall(la->la_cookie, rc);
267         RETURN(rc);
268 }
269
270 /**
271  * If rqset is NULL, do not wait for response. Upcall and cookie could also
272  * be NULL in this case
273  */
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275                      struct ladvise_hdr *ladvise_hdr,
276                      obd_enqueue_update_f upcall, void *cookie,
277                      struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct ost_body         *body;
281         struct osc_ladvise_args *la;
282         int                      rc;
283         struct lu_ladvise       *req_ladvise;
284         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
285         int                      num_advise = ladvise_hdr->lah_count;
286         struct ladvise_hdr      *req_ladvise_hdr;
287         ENTRY;
288
289         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
290         if (req == NULL)
291                 RETURN(-ENOMEM);
292
293         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294                              num_advise * sizeof(*ladvise));
295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296         if (rc != 0) {
297                 ptlrpc_request_free(req);
298                 RETURN(rc);
299         }
300         req->rq_request_portal = OST_IO_PORTAL;
301         ptlrpc_at_set_req_timeout(req);
302
303         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304         LASSERT(body);
305         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306                              oa);
307
308         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309                                                  &RMF_OST_LADVISE_HDR);
310         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311
312         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314         ptlrpc_request_set_replen(req);
315
316         if (rqset == NULL) {
317                 /* Do not wait for response. */
318                 ptlrpcd_add_req(req);
319                 RETURN(0);
320         }
321
322         req->rq_interpret_reply = osc_ladvise_interpret;
323         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324         la = ptlrpc_req_async_args(req);
325         la->la_oa = oa;
326         la->la_upcall = upcall;
327         la->la_cookie = cookie;
328
329         if (rqset == PTLRPCD_SET)
330                 ptlrpcd_add_req(req);
331         else
332                 ptlrpc_set_add_req(rqset, req);
333
334         RETURN(0);
335 }
336
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
338                       struct obdo *oa)
339 {
340         struct ptlrpc_request *req;
341         struct ost_body       *body;
342         int                    rc;
343         ENTRY;
344
345         LASSERT(oa != NULL);
346         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
348
349         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
350         if (req == NULL)
351                 GOTO(out, rc = -ENOMEM);
352
353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
354         if (rc) {
355                 ptlrpc_request_free(req);
356                 GOTO(out, rc);
357         }
358
359         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
360         LASSERT(body);
361
362         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
363
364         ptlrpc_request_set_replen(req);
365
366         rc = ptlrpc_queue_wait(req);
367         if (rc)
368                 GOTO(out_req, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out_req, rc = -EPROTO);
373
374         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
376
377         oa->o_blksize = cli_brw_size(exp->exp_obd);
378         oa->o_valid |= OBD_MD_FLBLKSZ;
379
380         CDEBUG(D_HA, "transno: %lld\n",
381                lustre_msg_get_transno(req->rq_repmsg));
382 out_req:
383         ptlrpc_req_finished(req);
384 out:
385         RETURN(rc);
386 }
387
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389                    obd_enqueue_update_f upcall, void *cookie)
390 {
391         struct ptlrpc_request *req;
392         struct osc_setattr_args *sa;
393         struct obd_import *imp = class_exp2cliimp(exp);
394         struct ost_body *body;
395         int rc;
396
397         ENTRY;
398
399         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
404         if (rc < 0) {
405                 ptlrpc_request_free(req);
406                 RETURN(rc);
407         }
408
409         osc_set_io_portal(req);
410
411         ptlrpc_at_set_req_timeout(req);
412
413         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414
415         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
416
417         ptlrpc_request_set_replen(req);
418
419         req->rq_interpret_reply = osc_setattr_interpret;
420         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421         sa = ptlrpc_req_async_args(req);
422         sa->sa_oa = oa;
423         sa->sa_upcall = upcall;
424         sa->sa_cookie = cookie;
425
426         ptlrpcd_add_req(req);
427
428         RETURN(0);
429 }
430 EXPORT_SYMBOL(osc_punch_send);
431
432 static int osc_sync_interpret(const struct lu_env *env,
433                               struct ptlrpc_request *req, void *args, int rc)
434 {
435         struct osc_fsync_args *fa = args;
436         struct ost_body *body;
437         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438         unsigned long valid = 0;
439         struct cl_object *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *args, int rc)
551 {
552         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
553
554         atomic_dec(&cli->cl_destroy_in_flight);
555         wake_up(&cli->cl_destroy_waitq);
556
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676                 unsigned long undirty;
677
678                 nrpages = cli->cl_max_pages_per_rpc;
679                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681                 undirty = nrpages << PAGE_SHIFT;
682                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
683                                  GRANT_PARAM)) {
684                         int nrextents;
685
686                         /* take extent tax into account when asking for more
687                          * grant space */
688                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
689                                      cli->cl_max_extent_pages;
690                         undirty += nrextents * cli->cl_grant_extent_tax;
691                 }
692                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693                  * to add extent tax, etc.
694                  */
695                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
697         }
698         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699         oa->o_dropped = cli->cl_lost_grant;
700         cli->cl_lost_grant = 0;
701         spin_unlock(&cli->cl_loi_list_lock);
702         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 }
705
706 void osc_update_next_shrink(struct client_obd *cli)
707 {
708         cli->cl_next_shrink_grant = ktime_get_seconds() +
709                                     cli->cl_grant_shrink_interval;
710
711         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712                cli->cl_next_shrink_grant);
713 }
714
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
716 {
717         spin_lock(&cli->cl_loi_list_lock);
718         cli->cl_avail_grant += grant;
719         spin_unlock(&cli->cl_loi_list_lock);
720 }
721
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 {
724         if (body->oa.o_valid & OBD_MD_FLGRANT) {
725                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726                 __osc_update_grant(cli, body->oa.o_grant);
727         }
728 }
729
730 /**
731  * grant thread data for shrinking space.
732  */
733 struct grant_thread_data {
734         struct list_head        gtd_clients;
735         struct mutex            gtd_mutex;
736         unsigned long           gtd_stopped:1;
737 };
738 static struct grant_thread_data client_gtd;
739
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741                                       struct ptlrpc_request *req,
742                                       void *args, int rc)
743 {
744         struct osc_grant_args *aa = args;
745         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746         struct ost_body *body;
747
748         if (rc != 0) {
749                 __osc_update_grant(cli, aa->aa_oa->o_grant);
750                 GOTO(out, rc);
751         }
752
753         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754         LASSERT(body);
755         osc_update_grant(cli, body);
756 out:
757         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
758
759         return rc;
760 }
761
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
763 {
764         spin_lock(&cli->cl_loi_list_lock);
765         oa->o_grant = cli->cl_avail_grant / 4;
766         cli->cl_avail_grant -= oa->o_grant;
767         spin_unlock(&cli->cl_loi_list_lock);
768         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769                 oa->o_valid |= OBD_MD_FLFLAGS;
770                 oa->o_flags = 0;
771         }
772         oa->o_flags |= OBD_FL_SHRINK_GRANT;
773         osc_update_next_shrink(cli);
774 }
775
776 /* Shrink the current grant, either from some large amount to enough for a
777  * full set of in-flight RPCs, or if we have already shrunk to that limit
778  * then to enough for a single RPC.  This avoids keeping more grant than
779  * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
781 {
782         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
784
785         spin_lock(&cli->cl_loi_list_lock);
786         if (cli->cl_avail_grant <= target_bytes)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788         spin_unlock(&cli->cl_loi_list_lock);
789
790         return osc_shrink_grant_to_target(cli, target_bytes);
791 }
792
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
794 {
795         int                     rc = 0;
796         struct ost_body        *body;
797         ENTRY;
798
799         spin_lock(&cli->cl_loi_list_lock);
800         /* Don't shrink if we are already above or below the desired limit
801          * We don't want to shrink below a single RPC, as that will negatively
802          * impact block allocation and long-term performance. */
803         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
805
806         if (target_bytes >= cli->cl_avail_grant) {
807                 spin_unlock(&cli->cl_loi_list_lock);
808                 RETURN(0);
809         }
810         spin_unlock(&cli->cl_loi_list_lock);
811
812         OBD_ALLOC_PTR(body);
813         if (!body)
814                 RETURN(-ENOMEM);
815
816         osc_announce_cached(cli, &body->oa, 0);
817
818         spin_lock(&cli->cl_loi_list_lock);
819         if (target_bytes >= cli->cl_avail_grant) {
820                 /* available grant has changed since target calculation */
821                 spin_unlock(&cli->cl_loi_list_lock);
822                 GOTO(out_free, rc = 0);
823         }
824         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825         cli->cl_avail_grant = target_bytes;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828                 body->oa.o_valid |= OBD_MD_FLFLAGS;
829                 body->oa.o_flags = 0;
830         }
831         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833
834         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836                                 sizeof(*body), body, NULL);
837         if (rc != 0)
838                 __osc_update_grant(cli, body->oa.o_grant);
839 out_free:
840         OBD_FREE_PTR(body);
841         RETURN(rc);
842 }
843
844 static int osc_should_shrink_grant(struct client_obd *client)
845 {
846         time64_t next_shrink = client->cl_next_shrink_grant;
847
848         if (client->cl_import == NULL)
849                 return 0;
850
851         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852              OBD_CONNECT_GRANT_SHRINK) == 0)
853                 return 0;
854
855         if (ktime_get_seconds() >= next_shrink - 5) {
856                 /* Get the current RPC size directly, instead of going via:
857                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858                  * Keep comment here so that it can be found by searching. */
859                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860
861                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862                     client->cl_avail_grant > brw_size)
863                         return 1;
864                 else
865                         osc_update_next_shrink(client);
866         }
867         return 0;
868 }
869
870 #define GRANT_SHRINK_RPC_BATCH  100
871
872 static struct delayed_work work;
873
874 static void osc_grant_work_handler(struct work_struct *data)
875 {
876         struct client_obd *cli;
877         int rpc_sent;
878         bool init_next_shrink = true;
879         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
880
881         rpc_sent = 0;
882         mutex_lock(&client_gtd.gtd_mutex);
883         list_for_each_entry(cli, &client_gtd.gtd_clients,
884                             cl_grant_chain) {
885                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886                     osc_should_shrink_grant(cli)) {
887                         osc_shrink_grant(cli);
888                         rpc_sent++;
889                 }
890
891                 if (!init_next_shrink) {
892                         if (cli->cl_next_shrink_grant < next_shrink &&
893                             cli->cl_next_shrink_grant > ktime_get_seconds())
894                                 next_shrink = cli->cl_next_shrink_grant;
895                 } else {
896                         init_next_shrink = false;
897                         next_shrink = cli->cl_next_shrink_grant;
898                 }
899         }
900         mutex_unlock(&client_gtd.gtd_mutex);
901
902         if (client_gtd.gtd_stopped == 1)
903                 return;
904
905         if (next_shrink > ktime_get_seconds()) {
906                 time64_t delay = next_shrink - ktime_get_seconds();
907
908                 schedule_delayed_work(&work, cfs_time_seconds(delay));
909         } else {
910                 schedule_work(&work.work);
911         }
912 }
913
914 void osc_schedule_grant_work(void)
915 {
916         cancel_delayed_work_sync(&work);
917         schedule_work(&work.work);
918 }
919
920 /**
921  * Start grant thread for returing grant to server for idle clients.
922  */
923 static int osc_start_grant_work(void)
924 {
925         client_gtd.gtd_stopped = 0;
926         mutex_init(&client_gtd.gtd_mutex);
927         INIT_LIST_HEAD(&client_gtd.gtd_clients);
928
929         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
930         schedule_work(&work.work);
931
932         return 0;
933 }
934
935 static void osc_stop_grant_work(void)
936 {
937         client_gtd.gtd_stopped = 1;
938         cancel_delayed_work_sync(&work);
939 }
940
941 static void osc_add_grant_list(struct client_obd *client)
942 {
943         mutex_lock(&client_gtd.gtd_mutex);
944         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
945         mutex_unlock(&client_gtd.gtd_mutex);
946 }
947
948 static void osc_del_grant_list(struct client_obd *client)
949 {
950         if (list_empty(&client->cl_grant_chain))
951                 return;
952
953         mutex_lock(&client_gtd.gtd_mutex);
954         list_del_init(&client->cl_grant_chain);
955         mutex_unlock(&client_gtd.gtd_mutex);
956 }
957
958 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
959 {
960         /*
961          * ocd_grant is the total grant amount we're expect to hold: if we've
962          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
963          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
964          * dirty.
965          *
966          * race is tolerable here: if we're evicted, but imp_state already
967          * left EVICTED state, then cl_dirty_pages must be 0 already.
968          */
969         spin_lock(&cli->cl_loi_list_lock);
970         cli->cl_avail_grant = ocd->ocd_grant;
971         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
972                 cli->cl_avail_grant -= cli->cl_reserved_grant;
973                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
974                         cli->cl_avail_grant -= cli->cl_dirty_grant;
975                 else
976                         cli->cl_avail_grant -=
977                                         cli->cl_dirty_pages << PAGE_SHIFT;
978         }
979
980         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
981                 u64 size;
982                 int chunk_mask;
983
984                 /* overhead for each extent insertion */
985                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
986                 /* determine the appropriate chunk size used by osc_extent. */
987                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
988                                           ocd->ocd_grant_blkbits);
989                 /* max_pages_per_rpc must be chunk aligned */
990                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
991                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
992                                              ~chunk_mask) & chunk_mask;
993                 /* determine maximum extent size, in #pages */
994                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
995                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
996                 if (cli->cl_max_extent_pages == 0)
997                         cli->cl_max_extent_pages = 1;
998         } else {
999                 cli->cl_grant_extent_tax = 0;
1000                 cli->cl_chunkbits = PAGE_SHIFT;
1001                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1002         }
1003         spin_unlock(&cli->cl_loi_list_lock);
1004
1005         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1006                 "chunk bits: %d cl_max_extent_pages: %d\n",
1007                 cli_name(cli),
1008                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1009                 cli->cl_max_extent_pages);
1010
1011         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1012                 osc_add_grant_list(cli);
1013 }
1014 EXPORT_SYMBOL(osc_init_grant);
1015
1016 /* We assume that the reason this OSC got a short read is because it read
1017  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1018  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1019  * this stripe never got written at or beyond this stripe offset yet. */
1020 static void handle_short_read(int nob_read, size_t page_count,
1021                               struct brw_page **pga)
1022 {
1023         char *ptr;
1024         int i = 0;
1025
1026         /* skip bytes read OK */
1027         while (nob_read > 0) {
1028                 LASSERT (page_count > 0);
1029
1030                 if (pga[i]->count > nob_read) {
1031                         /* EOF inside this page */
1032                         ptr = kmap(pga[i]->pg) +
1033                                 (pga[i]->off & ~PAGE_MASK);
1034                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1035                         kunmap(pga[i]->pg);
1036                         page_count--;
1037                         i++;
1038                         break;
1039                 }
1040
1041                 nob_read -= pga[i]->count;
1042                 page_count--;
1043                 i++;
1044         }
1045
1046         /* zero remaining pages */
1047         while (page_count-- > 0) {
1048                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1049                 memset(ptr, 0, pga[i]->count);
1050                 kunmap(pga[i]->pg);
1051                 i++;
1052         }
1053 }
1054
1055 static int check_write_rcs(struct ptlrpc_request *req,
1056                            int requested_nob, int niocount,
1057                            size_t page_count, struct brw_page **pga)
1058 {
1059         int     i;
1060         __u32   *remote_rcs;
1061
1062         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1063                                                   sizeof(*remote_rcs) *
1064                                                   niocount);
1065         if (remote_rcs == NULL) {
1066                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1067                 return(-EPROTO);
1068         }
1069
1070         /* return error if any niobuf was in error */
1071         for (i = 0; i < niocount; i++) {
1072                 if ((int)remote_rcs[i] < 0) {
1073                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1074                                i, remote_rcs[i], req);
1075                         return remote_rcs[i];
1076                 }
1077
1078                 if (remote_rcs[i] != 0) {
1079                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1080                                 i, remote_rcs[i], req);
1081                         return(-EPROTO);
1082                 }
1083         }
1084         if (req->rq_bulk != NULL &&
1085             req->rq_bulk->bd_nob_transferred != requested_nob) {
1086                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1087                        req->rq_bulk->bd_nob_transferred, requested_nob);
1088                 return(-EPROTO);
1089         }
1090
1091         return (0);
1092 }
1093
1094 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1095 {
1096         if (p1->flag != p2->flag) {
1097                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1098                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1099                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1100
1101                 /* warn if we try to combine flags that we don't know to be
1102                  * safe to combine */
1103                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1104                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1105                               "report this at https://jira.whamcloud.com/\n",
1106                               p1->flag, p2->flag);
1107                 }
1108                 return 0;
1109         }
1110
1111         return (p1->off + p1->count == p2->off);
1112 }
1113
1114 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1115 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1116                                    size_t pg_count, struct brw_page **pga,
1117                                    int opc, obd_dif_csum_fn *fn,
1118                                    int sector_size,
1119                                    u32 *check_sum)
1120 {
1121         struct ahash_request *req;
1122         /* Used Adler as the default checksum type on top of DIF tags */
1123         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1124         struct page *__page;
1125         unsigned char *buffer;
1126         __u16 *guard_start;
1127         unsigned int bufsize;
1128         int guard_number;
1129         int used_number = 0;
1130         int used;
1131         u32 cksum;
1132         int rc = 0;
1133         int i = 0;
1134
1135         LASSERT(pg_count > 0);
1136
1137         __page = alloc_page(GFP_KERNEL);
1138         if (__page == NULL)
1139                 return -ENOMEM;
1140
1141         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1142         if (IS_ERR(req)) {
1143                 rc = PTR_ERR(req);
1144                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1145                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1146                 GOTO(out, rc);
1147         }
1148
1149         buffer = kmap(__page);
1150         guard_start = (__u16 *)buffer;
1151         guard_number = PAGE_SIZE / sizeof(*guard_start);
1152         while (nob > 0 && pg_count > 0) {
1153                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1154
1155                 /* corrupt the data before we compute the checksum, to
1156                  * simulate an OST->client data error */
1157                 if (unlikely(i == 0 && opc == OST_READ &&
1158                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1159                         unsigned char *ptr = kmap(pga[i]->pg);
1160                         int off = pga[i]->off & ~PAGE_MASK;
1161
1162                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1163                         kunmap(pga[i]->pg);
1164                 }
1165
1166                 /*
1167                  * The left guard number should be able to hold checksums of a
1168                  * whole page
1169                  */
1170                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1171                                                   pga[i]->off & ~PAGE_MASK,
1172                                                   count,
1173                                                   guard_start + used_number,
1174                                                   guard_number - used_number,
1175                                                   &used, sector_size,
1176                                                   fn);
1177                 if (rc)
1178                         break;
1179
1180                 used_number += used;
1181                 if (used_number == guard_number) {
1182                         cfs_crypto_hash_update_page(req, __page, 0,
1183                                 used_number * sizeof(*guard_start));
1184                         used_number = 0;
1185                 }
1186
1187                 nob -= pga[i]->count;
1188                 pg_count--;
1189                 i++;
1190         }
1191         kunmap(__page);
1192         if (rc)
1193                 GOTO(out, rc);
1194
1195         if (used_number != 0)
1196                 cfs_crypto_hash_update_page(req, __page, 0,
1197                         used_number * sizeof(*guard_start));
1198
1199         bufsize = sizeof(cksum);
1200         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1201
1202         /* For sending we only compute the wrong checksum instead
1203          * of corrupting the data so it is still correct on a redo */
1204         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1205                 cksum++;
1206
1207         *check_sum = cksum;
1208 out:
1209         __free_page(__page);
1210         return rc;
1211 }
1212 #else /* !CONFIG_CRC_T10DIF */
1213 #define obd_dif_ip_fn NULL
1214 #define obd_dif_crc_fn NULL
1215 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1216         -EOPNOTSUPP
1217 #endif /* CONFIG_CRC_T10DIF */
1218
1219 static int osc_checksum_bulk(int nob, size_t pg_count,
1220                              struct brw_page **pga, int opc,
1221                              enum cksum_types cksum_type,
1222                              u32 *cksum)
1223 {
1224         int                             i = 0;
1225         struct ahash_request           *req;
1226         unsigned int                    bufsize;
1227         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1228
1229         LASSERT(pg_count > 0);
1230
1231         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1232         if (IS_ERR(req)) {
1233                 CERROR("Unable to initialize checksum hash %s\n",
1234                        cfs_crypto_hash_name(cfs_alg));
1235                 return PTR_ERR(req);
1236         }
1237
1238         while (nob > 0 && pg_count > 0) {
1239                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1240
1241                 /* corrupt the data before we compute the checksum, to
1242                  * simulate an OST->client data error */
1243                 if (i == 0 && opc == OST_READ &&
1244                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1245                         unsigned char *ptr = kmap(pga[i]->pg);
1246                         int off = pga[i]->off & ~PAGE_MASK;
1247
1248                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1249                         kunmap(pga[i]->pg);
1250                 }
1251                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1252                                             pga[i]->off & ~PAGE_MASK,
1253                                             count);
1254                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1255                                (int)(pga[i]->off & ~PAGE_MASK));
1256
1257                 nob -= pga[i]->count;
1258                 pg_count--;
1259                 i++;
1260         }
1261
1262         bufsize = sizeof(*cksum);
1263         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1264
1265         /* For sending we only compute the wrong checksum instead
1266          * of corrupting the data so it is still correct on a redo */
1267         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1268                 (*cksum)++;
1269
1270         return 0;
1271 }
1272
1273 static int osc_checksum_bulk_rw(const char *obd_name,
1274                                 enum cksum_types cksum_type,
1275                                 int nob, size_t pg_count,
1276                                 struct brw_page **pga, int opc,
1277                                 u32 *check_sum)
1278 {
1279         obd_dif_csum_fn *fn = NULL;
1280         int sector_size = 0;
1281         int rc;
1282
1283         ENTRY;
1284         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1285
1286         if (fn)
1287                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1288                                              opc, fn, sector_size, check_sum);
1289         else
1290                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1291                                        check_sum);
1292
1293         RETURN(rc);
1294 }
1295
1296 static int
1297 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1298                      u32 page_count, struct brw_page **pga,
1299                      struct ptlrpc_request **reqp, int resend)
1300 {
1301         struct ptlrpc_request   *req;
1302         struct ptlrpc_bulk_desc *desc;
1303         struct ost_body         *body;
1304         struct obd_ioobj        *ioobj;
1305         struct niobuf_remote    *niobuf;
1306         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1307         struct osc_brw_async_args *aa;
1308         struct req_capsule      *pill;
1309         struct brw_page *pg_prev;
1310         void *short_io_buf;
1311         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1312
1313         ENTRY;
1314         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1315                 RETURN(-ENOMEM); /* Recoverable */
1316         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1317                 RETURN(-EINVAL); /* Fatal */
1318
1319         if ((cmd & OBD_BRW_WRITE) != 0) {
1320                 opc = OST_WRITE;
1321                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1322                                                 osc_rq_pool,
1323                                                 &RQF_OST_BRW_WRITE);
1324         } else {
1325                 opc = OST_READ;
1326                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1327         }
1328         if (req == NULL)
1329                 RETURN(-ENOMEM);
1330
1331         for (niocount = i = 1; i < page_count; i++) {
1332                 if (!can_merge_pages(pga[i - 1], pga[i]))
1333                         niocount++;
1334         }
1335
1336         pill = &req->rq_pill;
1337         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1338                              sizeof(*ioobj));
1339         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1340                              niocount * sizeof(*niobuf));
1341
1342         for (i = 0; i < page_count; i++)
1343                 short_io_size += pga[i]->count;
1344
1345         /* Check if read/write is small enough to be a short io. */
1346         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1347             !imp_connect_shortio(cli->cl_import))
1348                 short_io_size = 0;
1349
1350         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1351                              opc == OST_READ ? 0 : short_io_size);
1352         if (opc == OST_READ)
1353                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1354                                      short_io_size);
1355
1356         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1357         if (rc) {
1358                 ptlrpc_request_free(req);
1359                 RETURN(rc);
1360         }
1361         osc_set_io_portal(req);
1362
1363         ptlrpc_at_set_req_timeout(req);
1364         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1365          * retry logic */
1366         req->rq_no_retry_einprogress = 1;
1367
1368         if (short_io_size != 0) {
1369                 desc = NULL;
1370                 short_io_buf = NULL;
1371                 goto no_bulk;
1372         }
1373
1374         desc = ptlrpc_prep_bulk_imp(req, page_count,
1375                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1376                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1377                         PTLRPC_BULK_PUT_SINK) |
1378                         PTLRPC_BULK_BUF_KIOV,
1379                 OST_BULK_PORTAL,
1380                 &ptlrpc_bulk_kiov_pin_ops);
1381
1382         if (desc == NULL)
1383                 GOTO(out, rc = -ENOMEM);
1384         /* NB request now owns desc and will free it when it gets freed */
1385 no_bulk:
1386         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1387         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1388         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1389         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1390
1391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1392
1393         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1394          * and from_kgid(), because they are asynchronous. Fortunately, variable
1395          * oa contains valid o_uid and o_gid in these two operations.
1396          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1397          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1398          * other process logic */
1399         body->oa.o_uid = oa->o_uid;
1400         body->oa.o_gid = oa->o_gid;
1401
1402         obdo_to_ioobj(oa, ioobj);
1403         ioobj->ioo_bufcnt = niocount;
1404         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1405          * that might be send for this request.  The actual number is decided
1406          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1407          * "max - 1" for old client compatibility sending "0", and also so the
1408          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1409         if (desc != NULL)
1410                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1411         else /* short io */
1412                 ioobj_max_brw_set(ioobj, 0);
1413
1414         if (short_io_size != 0) {
1415                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1416                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1417                         body->oa.o_flags = 0;
1418                 }
1419                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1420                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1421                        short_io_size);
1422                 if (opc == OST_WRITE) {
1423                         short_io_buf = req_capsule_client_get(pill,
1424                                                               &RMF_SHORT_IO);
1425                         LASSERT(short_io_buf != NULL);
1426                 }
1427         }
1428
1429         LASSERT(page_count > 0);
1430         pg_prev = pga[0];
1431         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1432                 struct brw_page *pg = pga[i];
1433                 int poff = pg->off & ~PAGE_MASK;
1434
1435                 LASSERT(pg->count > 0);
1436                 /* make sure there is no gap in the middle of page array */
1437                 LASSERTF(page_count == 1 ||
1438                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1439                           ergo(i > 0 && i < page_count - 1,
1440                                poff == 0 && pg->count == PAGE_SIZE)   &&
1441                           ergo(i == page_count - 1, poff == 0)),
1442                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1443                          i, page_count, pg, pg->off, pg->count);
1444                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1445                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1446                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1447                          i, page_count,
1448                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1449                          pg_prev->pg, page_private(pg_prev->pg),
1450                          pg_prev->pg->index, pg_prev->off);
1451                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1452                         (pg->flag & OBD_BRW_SRVLOCK));
1453                 if (short_io_size != 0 && opc == OST_WRITE) {
1454                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1455
1456                         LASSERT(short_io_size >= requested_nob + pg->count);
1457                         memcpy(short_io_buf + requested_nob,
1458                                ptr + poff,
1459                                pg->count);
1460                         ll_kunmap_atomic(ptr, KM_USER0);
1461                 } else if (short_io_size == 0) {
1462                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1463                                                          pg->count);
1464                 }
1465                 requested_nob += pg->count;
1466
1467                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1468                         niobuf--;
1469                         niobuf->rnb_len += pg->count;
1470                 } else {
1471                         niobuf->rnb_offset = pg->off;
1472                         niobuf->rnb_len    = pg->count;
1473                         niobuf->rnb_flags  = pg->flag;
1474                 }
1475                 pg_prev = pg;
1476         }
1477
1478         LASSERTF((void *)(niobuf - niocount) ==
1479                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1480                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1481                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1482
1483         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1484         if (resend) {
1485                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1486                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1487                         body->oa.o_flags = 0;
1488                 }
1489                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1490         }
1491
1492         if (osc_should_shrink_grant(cli))
1493                 osc_shrink_grant_local(cli, &body->oa);
1494
1495         /* size[REQ_REC_OFF] still sizeof (*body) */
1496         if (opc == OST_WRITE) {
1497                 if (cli->cl_checksum &&
1498                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1499                         /* store cl_cksum_type in a local variable since
1500                          * it can be changed via lprocfs */
1501                         enum cksum_types cksum_type = cli->cl_cksum_type;
1502
1503                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1504                                 body->oa.o_flags = 0;
1505
1506                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1507                                                                 cksum_type);
1508                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1509
1510                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1511                                                   requested_nob, page_count,
1512                                                   pga, OST_WRITE,
1513                                                   &body->oa.o_cksum);
1514                         if (rc < 0) {
1515                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1516                                        rc);
1517                                 GOTO(out, rc);
1518                         }
1519                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1520                                body->oa.o_cksum);
1521
1522                         /* save this in 'oa', too, for later checking */
1523                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1524                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1525                                                            cksum_type);
1526                 } else {
1527                         /* clear out the checksum flag, in case this is a
1528                          * resend but cl_checksum is no longer set. b=11238 */
1529                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1530                 }
1531                 oa->o_cksum = body->oa.o_cksum;
1532                 /* 1 RC per niobuf */
1533                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1534                                      sizeof(__u32) * niocount);
1535         } else {
1536                 if (cli->cl_checksum &&
1537                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1538                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1539                                 body->oa.o_flags = 0;
1540                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1541                                 cli->cl_cksum_type);
1542                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1543                 }
1544
1545                 /* Client cksum has been already copied to wire obdo in previous
1546                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1547                  * resent due to cksum error, this will allow Server to
1548                  * check+dump pages on its side */
1549         }
1550         ptlrpc_request_set_replen(req);
1551
1552         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1553         aa = ptlrpc_req_async_args(req);
1554         aa->aa_oa = oa;
1555         aa->aa_requested_nob = requested_nob;
1556         aa->aa_nio_count = niocount;
1557         aa->aa_page_count = page_count;
1558         aa->aa_resends = 0;
1559         aa->aa_ppga = pga;
1560         aa->aa_cli = cli;
1561         INIT_LIST_HEAD(&aa->aa_oaps);
1562
1563         *reqp = req;
1564         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1565         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1566                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1567                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1568         RETURN(0);
1569
1570  out:
1571         ptlrpc_req_finished(req);
1572         RETURN(rc);
1573 }
1574
1575 char dbgcksum_file_name[PATH_MAX];
1576
1577 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1578                                 struct brw_page **pga, __u32 server_cksum,
1579                                 __u32 client_cksum)
1580 {
1581         struct file *filp;
1582         int rc, i;
1583         unsigned int len;
1584         char *buf;
1585
1586         /* will only keep dump of pages on first error for the same range in
1587          * file/fid, not during the resends/retries. */
1588         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1589                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1590                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1591                   libcfs_debug_file_path_arr :
1592                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1593                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1594                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1595                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1596                  pga[0]->off,
1597                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1598                  client_cksum, server_cksum);
1599         filp = filp_open(dbgcksum_file_name,
1600                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1601         if (IS_ERR(filp)) {
1602                 rc = PTR_ERR(filp);
1603                 if (rc == -EEXIST)
1604                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1605                                "checksum error: rc = %d\n", dbgcksum_file_name,
1606                                rc);
1607                 else
1608                         CERROR("%s: can't open to dump pages with checksum "
1609                                "error: rc = %d\n", dbgcksum_file_name, rc);
1610                 return;
1611         }
1612
1613         for (i = 0; i < page_count; i++) {
1614                 len = pga[i]->count;
1615                 buf = kmap(pga[i]->pg);
1616                 while (len != 0) {
1617                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1618                         if (rc < 0) {
1619                                 CERROR("%s: wanted to write %u but got %d "
1620                                        "error\n", dbgcksum_file_name, len, rc);
1621                                 break;
1622                         }
1623                         len -= rc;
1624                         buf += rc;
1625                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1626                                dbgcksum_file_name, rc);
1627                 }
1628                 kunmap(pga[i]->pg);
1629         }
1630
1631         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1632         if (rc)
1633                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1634         filp_close(filp, NULL);
1635         return;
1636 }
1637
1638 static int
1639 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1640                      __u32 client_cksum, __u32 server_cksum,
1641                      struct osc_brw_async_args *aa)
1642 {
1643         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1644         enum cksum_types cksum_type;
1645         obd_dif_csum_fn *fn = NULL;
1646         int sector_size = 0;
1647         __u32 new_cksum;
1648         char *msg;
1649         int rc;
1650
1651         if (server_cksum == client_cksum) {
1652                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1653                 return 0;
1654         }
1655
1656         if (aa->aa_cli->cl_checksum_dump)
1657                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1658                                     server_cksum, client_cksum);
1659
1660         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1661                                            oa->o_flags : 0);
1662
1663         switch (cksum_type) {
1664         case OBD_CKSUM_T10IP512:
1665                 fn = obd_dif_ip_fn;
1666                 sector_size = 512;
1667                 break;
1668         case OBD_CKSUM_T10IP4K:
1669                 fn = obd_dif_ip_fn;
1670                 sector_size = 4096;
1671                 break;
1672         case OBD_CKSUM_T10CRC512:
1673                 fn = obd_dif_crc_fn;
1674                 sector_size = 512;
1675                 break;
1676         case OBD_CKSUM_T10CRC4K:
1677                 fn = obd_dif_crc_fn;
1678                 sector_size = 4096;
1679                 break;
1680         default:
1681                 break;
1682         }
1683
1684         if (fn)
1685                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1686                                              aa->aa_page_count, aa->aa_ppga,
1687                                              OST_WRITE, fn, sector_size,
1688                                              &new_cksum);
1689         else
1690                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1691                                        aa->aa_ppga, OST_WRITE, cksum_type,
1692                                        &new_cksum);
1693
1694         if (rc < 0)
1695                 msg = "failed to calculate the client write checksum";
1696         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1697                 msg = "the server did not use the checksum type specified in "
1698                       "the original request - likely a protocol problem";
1699         else if (new_cksum == server_cksum)
1700                 msg = "changed on the client after we checksummed it - "
1701                       "likely false positive due to mmap IO (bug 11742)";
1702         else if (new_cksum == client_cksum)
1703                 msg = "changed in transit before arrival at OST";
1704         else
1705                 msg = "changed in transit AND doesn't match the original - "
1706                       "likely false positive due to mmap IO (bug 11742)";
1707
1708         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1709                            DFID " object "DOSTID" extent [%llu-%llu], original "
1710                            "client csum %x (type %x), server csum %x (type %x),"
1711                            " client csum now %x\n",
1712                            obd_name, msg, libcfs_nid2str(peer->nid),
1713                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1714                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1715                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1716                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1717                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1718                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1719                            client_cksum,
1720                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1721                            server_cksum, cksum_type, new_cksum);
1722         return 1;
1723 }
1724
1725 /* Note rc enters this function as number of bytes transferred */
1726 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1727 {
1728         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1729         struct client_obd *cli = aa->aa_cli;
1730         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1731         const struct lnet_process_id *peer =
1732                 &req->rq_import->imp_connection->c_peer;
1733         struct ost_body *body;
1734         u32 client_cksum = 0;
1735         ENTRY;
1736
1737         if (rc < 0 && rc != -EDQUOT) {
1738                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1739                 RETURN(rc);
1740         }
1741
1742         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1743         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1744         if (body == NULL) {
1745                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1746                 RETURN(-EPROTO);
1747         }
1748
1749         /* set/clear over quota flag for a uid/gid/projid */
1750         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1751             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1752                 unsigned qid[LL_MAXQUOTAS] = {
1753                                          body->oa.o_uid, body->oa.o_gid,
1754                                          body->oa.o_projid };
1755                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1756                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1757                        body->oa.o_valid, body->oa.o_flags);
1758                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1759                                        body->oa.o_flags);
1760         }
1761
1762         osc_update_grant(cli, body);
1763
1764         if (rc < 0)
1765                 RETURN(rc);
1766
1767         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1768                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1769
1770         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1771                 if (rc > 0) {
1772                         CERROR("Unexpected +ve rc %d\n", rc);
1773                         RETURN(-EPROTO);
1774                 }
1775
1776                 if (req->rq_bulk != NULL &&
1777                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1778                         RETURN(-EAGAIN);
1779
1780                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1781                     check_write_checksum(&body->oa, peer, client_cksum,
1782                                          body->oa.o_cksum, aa))
1783                         RETURN(-EAGAIN);
1784
1785                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1786                                      aa->aa_page_count, aa->aa_ppga);
1787                 GOTO(out, rc);
1788         }
1789
1790         /* The rest of this function executes only for OST_READs */
1791
1792         if (req->rq_bulk == NULL) {
1793                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1794                                           RCL_SERVER);
1795                 LASSERT(rc == req->rq_status);
1796         } else {
1797                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1798                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1799         }
1800         if (rc < 0)
1801                 GOTO(out, rc = -EAGAIN);
1802
1803         if (rc > aa->aa_requested_nob) {
1804                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1805                        aa->aa_requested_nob);
1806                 RETURN(-EPROTO);
1807         }
1808
1809         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1810                 CERROR ("Unexpected rc %d (%d transferred)\n",
1811                         rc, req->rq_bulk->bd_nob_transferred);
1812                 return (-EPROTO);
1813         }
1814
1815         if (req->rq_bulk == NULL) {
1816                 /* short io */
1817                 int nob, pg_count, i = 0;
1818                 unsigned char *buf;
1819
1820                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1821                 pg_count = aa->aa_page_count;
1822                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1823                                                    rc);
1824                 nob = rc;
1825                 while (nob > 0 && pg_count > 0) {
1826                         unsigned char *ptr;
1827                         int count = aa->aa_ppga[i]->count > nob ?
1828                                     nob : aa->aa_ppga[i]->count;
1829
1830                         CDEBUG(D_CACHE, "page %p count %d\n",
1831                                aa->aa_ppga[i]->pg, count);
1832                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1833                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1834                                count);
1835                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1836
1837                         buf += count;
1838                         nob -= count;
1839                         i++;
1840                         pg_count--;
1841                 }
1842         }
1843
1844         if (rc < aa->aa_requested_nob)
1845                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1846
1847         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1848                 static int cksum_counter;
1849                 u32        server_cksum = body->oa.o_cksum;
1850                 char      *via = "";
1851                 char      *router = "";
1852                 enum cksum_types cksum_type;
1853                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1854                         body->oa.o_flags : 0;
1855
1856                 cksum_type = obd_cksum_type_unpack(o_flags);
1857                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1858                                           aa->aa_page_count, aa->aa_ppga,
1859                                           OST_READ, &client_cksum);
1860                 if (rc < 0)
1861                         GOTO(out, rc);
1862
1863                 if (req->rq_bulk != NULL &&
1864                     peer->nid != req->rq_bulk->bd_sender) {
1865                         via = " via ";
1866                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1867                 }
1868
1869                 if (server_cksum != client_cksum) {
1870                         struct ost_body *clbody;
1871                         u32 page_count = aa->aa_page_count;
1872
1873                         clbody = req_capsule_client_get(&req->rq_pill,
1874                                                         &RMF_OST_BODY);
1875                         if (cli->cl_checksum_dump)
1876                                 dump_all_bulk_pages(&clbody->oa, page_count,
1877                                                     aa->aa_ppga, server_cksum,
1878                                                     client_cksum);
1879
1880                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1881                                            "%s%s%s inode "DFID" object "DOSTID
1882                                            " extent [%llu-%llu], client %x, "
1883                                            "server %x, cksum_type %x\n",
1884                                            obd_name,
1885                                            libcfs_nid2str(peer->nid),
1886                                            via, router,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_seq : 0ULL,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_oid : 0,
1891                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1892                                                 clbody->oa.o_parent_ver : 0,
1893                                            POSTID(&body->oa.o_oi),
1894                                            aa->aa_ppga[0]->off,
1895                                            aa->aa_ppga[page_count-1]->off +
1896                                            aa->aa_ppga[page_count-1]->count - 1,
1897                                            client_cksum, server_cksum,
1898                                            cksum_type);
1899                         cksum_counter = 0;
1900                         aa->aa_oa->o_cksum = client_cksum;
1901                         rc = -EAGAIN;
1902                 } else {
1903                         cksum_counter++;
1904                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1905                         rc = 0;
1906                 }
1907         } else if (unlikely(client_cksum)) {
1908                 static int cksum_missed;
1909
1910                 cksum_missed++;
1911                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1912                         CERROR("Checksum %u requested from %s but not sent\n",
1913                                cksum_missed, libcfs_nid2str(peer->nid));
1914         } else {
1915                 rc = 0;
1916         }
1917 out:
1918         if (rc >= 0)
1919                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1920                                      aa->aa_oa, &body->oa);
1921
1922         RETURN(rc);
1923 }
1924
1925 static int osc_brw_redo_request(struct ptlrpc_request *request,
1926                                 struct osc_brw_async_args *aa, int rc)
1927 {
1928         struct ptlrpc_request *new_req;
1929         struct osc_brw_async_args *new_aa;
1930         struct osc_async_page *oap;
1931         ENTRY;
1932
1933         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1934                   "redo for recoverable error %d", rc);
1935
1936         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1937                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1938                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1939                                   aa->aa_ppga, &new_req, 1);
1940         if (rc)
1941                 RETURN(rc);
1942
1943         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1944                 if (oap->oap_request != NULL) {
1945                         LASSERTF(request == oap->oap_request,
1946                                  "request %p != oap_request %p\n",
1947                                  request, oap->oap_request);
1948                         if (oap->oap_interrupted) {
1949                                 ptlrpc_req_finished(new_req);
1950                                 RETURN(-EINTR);
1951                         }
1952                 }
1953         }
1954         /*
1955          * New request takes over pga and oaps from old request.
1956          * Note that copying a list_head doesn't work, need to move it...
1957          */
1958         aa->aa_resends++;
1959         new_req->rq_interpret_reply = request->rq_interpret_reply;
1960         new_req->rq_async_args = request->rq_async_args;
1961         new_req->rq_commit_cb = request->rq_commit_cb;
1962         /* cap resend delay to the current request timeout, this is similar to
1963          * what ptlrpc does (see after_reply()) */
1964         if (aa->aa_resends > new_req->rq_timeout)
1965                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1966         else
1967                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1968         new_req->rq_generation_set = 1;
1969         new_req->rq_import_generation = request->rq_import_generation;
1970
1971         new_aa = ptlrpc_req_async_args(new_req);
1972
1973         INIT_LIST_HEAD(&new_aa->aa_oaps);
1974         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1975         INIT_LIST_HEAD(&new_aa->aa_exts);
1976         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1977         new_aa->aa_resends = aa->aa_resends;
1978
1979         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1980                 if (oap->oap_request) {
1981                         ptlrpc_req_finished(oap->oap_request);
1982                         oap->oap_request = ptlrpc_request_addref(new_req);
1983                 }
1984         }
1985
1986         /* XXX: This code will run into problem if we're going to support
1987          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1988          * and wait for all of them to be finished. We should inherit request
1989          * set from old request. */
1990         ptlrpcd_add_req(new_req);
1991
1992         DEBUG_REQ(D_INFO, new_req, "new request");
1993         RETURN(0);
1994 }
1995
1996 /*
1997  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1998  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1999  * fine for our small page arrays and doesn't require allocation.  its an
2000  * insertion sort that swaps elements that are strides apart, shrinking the
2001  * stride down until its '1' and the array is sorted.
2002  */
2003 static void sort_brw_pages(struct brw_page **array, int num)
2004 {
2005         int stride, i, j;
2006         struct brw_page *tmp;
2007
2008         if (num == 1)
2009                 return;
2010         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2011                 ;
2012
2013         do {
2014                 stride /= 3;
2015                 for (i = stride ; i < num ; i++) {
2016                         tmp = array[i];
2017                         j = i;
2018                         while (j >= stride && array[j - stride]->off > tmp->off) {
2019                                 array[j] = array[j - stride];
2020                                 j -= stride;
2021                         }
2022                         array[j] = tmp;
2023                 }
2024         } while (stride > 1);
2025 }
2026
2027 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2028 {
2029         LASSERT(ppga != NULL);
2030         OBD_FREE(ppga, sizeof(*ppga) * count);
2031 }
2032
2033 static int brw_interpret(const struct lu_env *env,
2034                          struct ptlrpc_request *req, void *args, int rc)
2035 {
2036         struct osc_brw_async_args *aa = args;
2037         struct osc_extent *ext;
2038         struct osc_extent *tmp;
2039         struct client_obd *cli = aa->aa_cli;
2040         unsigned long transferred = 0;
2041
2042         ENTRY;
2043
2044         rc = osc_brw_fini_request(req, rc);
2045         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2046         /*
2047          * When server returns -EINPROGRESS, client should always retry
2048          * regardless of the number of times the bulk was resent already.
2049          */
2050         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2051                 if (req->rq_import_generation !=
2052                     req->rq_import->imp_generation) {
2053                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2054                                ""DOSTID", rc = %d.\n",
2055                                req->rq_import->imp_obd->obd_name,
2056                                POSTID(&aa->aa_oa->o_oi), rc);
2057                 } else if (rc == -EINPROGRESS ||
2058                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2059                         rc = osc_brw_redo_request(req, aa, rc);
2060                 } else {
2061                         CERROR("%s: too many resent retries for object: "
2062                                "%llu:%llu, rc = %d.\n",
2063                                req->rq_import->imp_obd->obd_name,
2064                                POSTID(&aa->aa_oa->o_oi), rc);
2065                 }
2066
2067                 if (rc == 0)
2068                         RETURN(0);
2069                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2070                         rc = -EIO;
2071         }
2072
2073         if (rc == 0) {
2074                 struct obdo *oa = aa->aa_oa;
2075                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2076                 unsigned long valid = 0;
2077                 struct cl_object *obj;
2078                 struct osc_async_page *last;
2079
2080                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2081                 obj = osc2cl(last->oap_obj);
2082
2083                 cl_object_attr_lock(obj);
2084                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2085                         attr->cat_blocks = oa->o_blocks;
2086                         valid |= CAT_BLOCKS;
2087                 }
2088                 if (oa->o_valid & OBD_MD_FLMTIME) {
2089                         attr->cat_mtime = oa->o_mtime;
2090                         valid |= CAT_MTIME;
2091                 }
2092                 if (oa->o_valid & OBD_MD_FLATIME) {
2093                         attr->cat_atime = oa->o_atime;
2094                         valid |= CAT_ATIME;
2095                 }
2096                 if (oa->o_valid & OBD_MD_FLCTIME) {
2097                         attr->cat_ctime = oa->o_ctime;
2098                         valid |= CAT_CTIME;
2099                 }
2100
2101                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2102                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2103                         loff_t last_off = last->oap_count + last->oap_obj_off +
2104                                 last->oap_page_off;
2105
2106                         /* Change file size if this is an out of quota or
2107                          * direct IO write and it extends the file size */
2108                         if (loi->loi_lvb.lvb_size < last_off) {
2109                                 attr->cat_size = last_off;
2110                                 valid |= CAT_SIZE;
2111                         }
2112                         /* Extend KMS if it's not a lockless write */
2113                         if (loi->loi_kms < last_off &&
2114                             oap2osc_page(last)->ops_srvlock == 0) {
2115                                 attr->cat_kms = last_off;
2116                                 valid |= CAT_KMS;
2117                         }
2118                 }
2119
2120                 if (valid != 0)
2121                         cl_object_attr_update(env, obj, attr, valid);
2122                 cl_object_attr_unlock(obj);
2123         }
2124         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2125
2126         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2127                 osc_inc_unstable_pages(req);
2128
2129         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2130                 list_del_init(&ext->oe_link);
2131                 osc_extent_finish(env, ext, 1,
2132                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2133         }
2134         LASSERT(list_empty(&aa->aa_exts));
2135         LASSERT(list_empty(&aa->aa_oaps));
2136
2137         transferred = (req->rq_bulk == NULL ? /* short io */
2138                        aa->aa_requested_nob :
2139                        req->rq_bulk->bd_nob_transferred);
2140
2141         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2142         ptlrpc_lprocfs_brw(req, transferred);
2143
2144         spin_lock(&cli->cl_loi_list_lock);
2145         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2146          * is called so we know whether to go to sync BRWs or wait for more
2147          * RPCs to complete */
2148         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2149                 cli->cl_w_in_flight--;
2150         else
2151                 cli->cl_r_in_flight--;
2152         osc_wake_cache_waiters(cli);
2153         spin_unlock(&cli->cl_loi_list_lock);
2154
2155         osc_io_unplug(env, cli, NULL);
2156         RETURN(rc);
2157 }
2158
2159 static void brw_commit(struct ptlrpc_request *req)
2160 {
2161         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2162          * this called via the rq_commit_cb, I need to ensure
2163          * osc_dec_unstable_pages is still called. Otherwise unstable
2164          * pages may be leaked. */
2165         spin_lock(&req->rq_lock);
2166         if (likely(req->rq_unstable)) {
2167                 req->rq_unstable = 0;
2168                 spin_unlock(&req->rq_lock);
2169
2170                 osc_dec_unstable_pages(req);
2171         } else {
2172                 req->rq_committed = 1;
2173                 spin_unlock(&req->rq_lock);
2174         }
2175 }
2176
2177 /**
2178  * Build an RPC by the list of extent @ext_list. The caller must ensure
2179  * that the total pages in this list are NOT over max pages per RPC.
2180  * Extents in the list must be in OES_RPC state.
2181  */
2182 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2183                   struct list_head *ext_list, int cmd)
2184 {
2185         struct ptlrpc_request           *req = NULL;
2186         struct osc_extent               *ext;
2187         struct brw_page                 **pga = NULL;
2188         struct osc_brw_async_args       *aa = NULL;
2189         struct obdo                     *oa = NULL;
2190         struct osc_async_page           *oap;
2191         struct osc_object               *obj = NULL;
2192         struct cl_req_attr              *crattr = NULL;
2193         loff_t                          starting_offset = OBD_OBJECT_EOF;
2194         loff_t                          ending_offset = 0;
2195         int                             mpflag = 0;
2196         int                             mem_tight = 0;
2197         int                             page_count = 0;
2198         bool                            soft_sync = false;
2199         bool                            interrupted = false;
2200         bool                            ndelay = false;
2201         int                             i;
2202         int                             grant = 0;
2203         int                             rc;
2204         __u32                           layout_version = 0;
2205         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2206         struct ost_body                 *body;
2207         ENTRY;
2208         LASSERT(!list_empty(ext_list));
2209
2210         /* add pages into rpc_list to build BRW rpc */
2211         list_for_each_entry(ext, ext_list, oe_link) {
2212                 LASSERT(ext->oe_state == OES_RPC);
2213                 mem_tight |= ext->oe_memalloc;
2214                 grant += ext->oe_grants;
2215                 page_count += ext->oe_nr_pages;
2216                 layout_version = MAX(layout_version, ext->oe_layout_version);
2217                 if (obj == NULL)
2218                         obj = ext->oe_obj;
2219         }
2220
2221         soft_sync = osc_over_unstable_soft_limit(cli);
2222         if (mem_tight)
2223                 mpflag = cfs_memory_pressure_get_and_set();
2224
2225         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2226         if (pga == NULL)
2227                 GOTO(out, rc = -ENOMEM);
2228
2229         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2230         if (oa == NULL)
2231                 GOTO(out, rc = -ENOMEM);
2232
2233         i = 0;
2234         list_for_each_entry(ext, ext_list, oe_link) {
2235                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2236                         if (mem_tight)
2237                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2238                         if (soft_sync)
2239                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2240                         pga[i] = &oap->oap_brw_page;
2241                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2242                         i++;
2243
2244                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2245                         if (starting_offset == OBD_OBJECT_EOF ||
2246                             starting_offset > oap->oap_obj_off)
2247                                 starting_offset = oap->oap_obj_off;
2248                         else
2249                                 LASSERT(oap->oap_page_off == 0);
2250                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2251                                 ending_offset = oap->oap_obj_off +
2252                                                 oap->oap_count;
2253                         else
2254                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2255                                         PAGE_SIZE);
2256                         if (oap->oap_interrupted)
2257                                 interrupted = true;
2258                 }
2259                 if (ext->oe_ndelay)
2260                         ndelay = true;
2261         }
2262
2263         /* first page in the list */
2264         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2265
2266         crattr = &osc_env_info(env)->oti_req_attr;
2267         memset(crattr, 0, sizeof(*crattr));
2268         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2269         crattr->cra_flags = ~0ULL;
2270         crattr->cra_page = oap2cl_page(oap);
2271         crattr->cra_oa = oa;
2272         cl_req_attr_set(env, osc2cl(obj), crattr);
2273
2274         if (cmd == OBD_BRW_WRITE) {
2275                 oa->o_grant_used = grant;
2276                 if (layout_version > 0) {
2277                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2278                                PFID(&oa->o_oi.oi_fid), layout_version);
2279
2280                         oa->o_layout_version = layout_version;
2281                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2282                 }
2283         }
2284
2285         sort_brw_pages(pga, page_count);
2286         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2287         if (rc != 0) {
2288                 CERROR("prep_req failed: %d\n", rc);
2289                 GOTO(out, rc);
2290         }
2291
2292         req->rq_commit_cb = brw_commit;
2293         req->rq_interpret_reply = brw_interpret;
2294         req->rq_memalloc = mem_tight != 0;
2295         oap->oap_request = ptlrpc_request_addref(req);
2296         if (interrupted && !req->rq_intr)
2297                 ptlrpc_mark_interrupted(req);
2298         if (ndelay) {
2299                 req->rq_no_resend = req->rq_no_delay = 1;
2300                 /* probably set a shorter timeout value.
2301                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2302                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2303         }
2304
2305         /* Need to update the timestamps after the request is built in case
2306          * we race with setattr (locally or in queue at OST).  If OST gets
2307          * later setattr before earlier BRW (as determined by the request xid),
2308          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2309          * way to do this in a single call.  bug 10150 */
2310         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2311         crattr->cra_oa = &body->oa;
2312         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2313         cl_req_attr_set(env, osc2cl(obj), crattr);
2314         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2315
2316         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2317         aa = ptlrpc_req_async_args(req);
2318         INIT_LIST_HEAD(&aa->aa_oaps);
2319         list_splice_init(&rpc_list, &aa->aa_oaps);
2320         INIT_LIST_HEAD(&aa->aa_exts);
2321         list_splice_init(ext_list, &aa->aa_exts);
2322
2323         spin_lock(&cli->cl_loi_list_lock);
2324         starting_offset >>= PAGE_SHIFT;
2325         if (cmd == OBD_BRW_READ) {
2326                 cli->cl_r_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2330                                       starting_offset + 1);
2331         } else {
2332                 cli->cl_w_in_flight++;
2333                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2334                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2335                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2336                                       starting_offset + 1);
2337         }
2338         spin_unlock(&cli->cl_loi_list_lock);
2339
2340         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2341                   page_count, aa, cli->cl_r_in_flight,
2342                   cli->cl_w_in_flight);
2343         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2344
2345         ptlrpcd_add_req(req);
2346         rc = 0;
2347         EXIT;
2348
2349 out:
2350         if (mem_tight != 0)
2351                 cfs_memory_pressure_restore(mpflag);
2352
2353         if (rc != 0) {
2354                 LASSERT(req == NULL);
2355
2356                 if (oa)
2357                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2358                 if (pga)
2359                         OBD_FREE(pga, sizeof(*pga) * page_count);
2360                 /* this should happen rarely and is pretty bad, it makes the
2361                  * pending list not follow the dirty order */
2362                 while (!list_empty(ext_list)) {
2363                         ext = list_entry(ext_list->next, struct osc_extent,
2364                                          oe_link);
2365                         list_del_init(&ext->oe_link);
2366                         osc_extent_finish(env, ext, 0, rc);
2367                 }
2368         }
2369         RETURN(rc);
2370 }
2371
2372 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2373 {
2374         int set = 0;
2375
2376         LASSERT(lock != NULL);
2377
2378         lock_res_and_lock(lock);
2379
2380         if (lock->l_ast_data == NULL)
2381                 lock->l_ast_data = data;
2382         if (lock->l_ast_data == data)
2383                 set = 1;
2384
2385         unlock_res_and_lock(lock);
2386
2387         return set;
2388 }
2389
2390 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2391                      void *cookie, struct lustre_handle *lockh,
2392                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2393                      int errcode)
2394 {
2395         bool intent = *flags & LDLM_FL_HAS_INTENT;
2396         int rc;
2397         ENTRY;
2398
2399         /* The request was created before ldlm_cli_enqueue call. */
2400         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2401                 struct ldlm_reply *rep;
2402
2403                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2404                 LASSERT(rep != NULL);
2405
2406                 rep->lock_policy_res1 =
2407                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2408                 if (rep->lock_policy_res1)
2409                         errcode = rep->lock_policy_res1;
2410                 if (!speculative)
2411                         *flags |= LDLM_FL_LVB_READY;
2412         } else if (errcode == ELDLM_OK) {
2413                 *flags |= LDLM_FL_LVB_READY;
2414         }
2415
2416         /* Call the update callback. */
2417         rc = (*upcall)(cookie, lockh, errcode);
2418
2419         /* release the reference taken in ldlm_cli_enqueue() */
2420         if (errcode == ELDLM_LOCK_MATCHED)
2421                 errcode = ELDLM_OK;
2422         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2423                 ldlm_lock_decref(lockh, mode);
2424
2425         RETURN(rc);
2426 }
2427
2428 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2429                           void *args, int rc)
2430 {
2431         struct osc_enqueue_args *aa = args;
2432         struct ldlm_lock *lock;
2433         struct lustre_handle *lockh = &aa->oa_lockh;
2434         enum ldlm_mode mode = aa->oa_mode;
2435         struct ost_lvb *lvb = aa->oa_lvb;
2436         __u32 lvb_len = sizeof(*lvb);
2437         __u64 flags = 0;
2438
2439         ENTRY;
2440
2441         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2442          * be valid. */
2443         lock = ldlm_handle2lock(lockh);
2444         LASSERTF(lock != NULL,
2445                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2446                  lockh->cookie, req, aa);
2447
2448         /* Take an additional reference so that a blocking AST that
2449          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2450          * to arrive after an upcall has been executed by
2451          * osc_enqueue_fini(). */
2452         ldlm_lock_addref(lockh, mode);
2453
2454         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2456
2457         /* Let CP AST to grant the lock first. */
2458         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2459
2460         if (aa->oa_speculative) {
2461                 LASSERT(aa->oa_lvb == NULL);
2462                 LASSERT(aa->oa_flags == NULL);
2463                 aa->oa_flags = &flags;
2464         }
2465
2466         /* Complete obtaining the lock procedure. */
2467         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2468                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2469                                    lockh, rc);
2470         /* Complete osc stuff. */
2471         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2472                               aa->oa_flags, aa->oa_speculative, rc);
2473
2474         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2475
2476         ldlm_lock_decref(lockh, mode);
2477         LDLM_LOCK_PUT(lock);
2478         RETURN(rc);
2479 }
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is evicted from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, union ldlm_policy_data *policy,
2492                      struct ost_lvb *lvb, int kms_valid,
2493                      osc_enqueue_upcall_f upcall, void *cookie,
2494                      struct ldlm_enqueue_info *einfo,
2495                      struct ptlrpc_request_set *rqset, int async,
2496                      bool speculative)
2497 {
2498         struct obd_device *obd = exp->exp_obd;
2499         struct lustre_handle lockh = { 0 };
2500         struct ptlrpc_request *req = NULL;
2501         int intent = *flags & LDLM_FL_HAS_INTENT;
2502         __u64 match_flags = *flags;
2503         enum ldlm_mode mode;
2504         int rc;
2505         ENTRY;
2506
2507         /* Filesystem lock extents are extended to page boundaries so that
2508          * dealing with the page cache is a little smoother.  */
2509         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2510         policy->l_extent.end |= ~PAGE_MASK;
2511
2512         /*
2513          * kms is not valid when either object is completely fresh (so that no
2514          * locks are cached), or object was evicted. In the latter case cached
2515          * lock cannot be used, because it would prime inode state with
2516          * potentially stale LVB.
2517          */
2518         if (!kms_valid)
2519                 goto no_match;
2520
2521         /* Next, search for already existing extent locks that will cover us */
2522         /* If we're trying to read, we also search for an existing PW lock.  The
2523          * VFS and page cache already protect us locally, so lots of readers/
2524          * writers can share a single PW lock.
2525          *
2526          * There are problems with conversion deadlocks, so instead of
2527          * converting a read lock to a write lock, we'll just enqueue a new
2528          * one.
2529          *
2530          * At some point we should cancel the read lock instead of making them
2531          * send us a blocking callback, but there are problems with canceling
2532          * locks out from other users right now, too. */
2533         mode = einfo->ei_mode;
2534         if (einfo->ei_mode == LCK_PR)
2535                 mode |= LCK_PW;
2536         /* Normal lock requests must wait for the LVB to be ready before
2537          * matching a lock; speculative lock requests do not need to,
2538          * because they will not actually use the lock. */
2539         if (!speculative)
2540                 match_flags |= LDLM_FL_LVB_READY;
2541         if (intent != 0)
2542                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2543         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2544                                einfo->ei_type, policy, mode, &lockh, 0);
2545         if (mode) {
2546                 struct ldlm_lock *matched;
2547
2548                 if (*flags & LDLM_FL_TEST_LOCK)
2549                         RETURN(ELDLM_OK);
2550
2551                 matched = ldlm_handle2lock(&lockh);
2552                 if (speculative) {
2553                         /* This DLM lock request is speculative, and does not
2554                          * have an associated IO request. Therefore if there
2555                          * is already a DLM lock, it wll just inform the
2556                          * caller to cancel the request for this stripe.*/
2557                         lock_res_and_lock(matched);
2558                         if (ldlm_extent_equal(&policy->l_extent,
2559                             &matched->l_policy_data.l_extent))
2560                                 rc = -EEXIST;
2561                         else
2562                                 rc = -ECANCELED;
2563                         unlock_res_and_lock(matched);
2564
2565                         ldlm_lock_decref(&lockh, mode);
2566                         LDLM_LOCK_PUT(matched);
2567                         RETURN(rc);
2568                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2569                         *flags |= LDLM_FL_LVB_READY;
2570
2571                         /* We already have a lock, and it's referenced. */
2572                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2573
2574                         ldlm_lock_decref(&lockh, mode);
2575                         LDLM_LOCK_PUT(matched);
2576                         RETURN(ELDLM_OK);
2577                 } else {
2578                         ldlm_lock_decref(&lockh, mode);
2579                         LDLM_LOCK_PUT(matched);
2580                 }
2581         }
2582
2583 no_match:
2584         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2585                 RETURN(-ENOLCK);
2586
2587         if (intent) {
2588                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2589                                            &RQF_LDLM_ENQUEUE_LVB);
2590                 if (req == NULL)
2591                         RETURN(-ENOMEM);
2592
2593                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2594                 if (rc) {
2595                         ptlrpc_request_free(req);
2596                         RETURN(rc);
2597                 }
2598
2599                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2600                                      sizeof *lvb);
2601                 ptlrpc_request_set_replen(req);
2602         }
2603
2604         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2605         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2606
2607         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2608                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2609         if (async) {
2610                 if (!rc) {
2611                         struct osc_enqueue_args *aa;
2612                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2613                         aa = ptlrpc_req_async_args(req);
2614                         aa->oa_exp         = exp;
2615                         aa->oa_mode        = einfo->ei_mode;
2616                         aa->oa_type        = einfo->ei_type;
2617                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2618                         aa->oa_upcall      = upcall;
2619                         aa->oa_cookie      = cookie;
2620                         aa->oa_speculative = speculative;
2621                         if (!speculative) {
2622                                 aa->oa_flags  = flags;
2623                                 aa->oa_lvb    = lvb;
2624                         } else {
2625                                 /* speculative locks are essentially to enqueue
2626                                  * a DLM lock  in advance, so we don't care
2627                                  * about the result of the enqueue. */
2628                                 aa->oa_lvb    = NULL;
2629                                 aa->oa_flags  = NULL;
2630                         }
2631
2632                         req->rq_interpret_reply = osc_enqueue_interpret;
2633                         if (rqset == PTLRPCD_SET)
2634                                 ptlrpcd_add_req(req);
2635                         else
2636                                 ptlrpc_set_add_req(rqset, req);
2637                 } else if (intent) {
2638                         ptlrpc_req_finished(req);
2639                 }
2640                 RETURN(rc);
2641         }
2642
2643         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2644                               flags, speculative, rc);
2645         if (intent)
2646                 ptlrpc_req_finished(req);
2647
2648         RETURN(rc);
2649 }
2650
2651 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2652                    enum ldlm_type type, union ldlm_policy_data *policy,
2653                    enum ldlm_mode mode, __u64 *flags, void *data,
2654                    struct lustre_handle *lockh, int unref)
2655 {
2656         struct obd_device *obd = exp->exp_obd;
2657         __u64 lflags = *flags;
2658         enum ldlm_mode rc;
2659         ENTRY;
2660
2661         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2662                 RETURN(-EIO);
2663
2664         /* Filesystem lock extents are extended to page boundaries so that
2665          * dealing with the page cache is a little smoother */
2666         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2667         policy->l_extent.end |= ~PAGE_MASK;
2668
2669         /* Next, search for already existing extent locks that will cover us */
2670         /* If we're trying to read, we also search for an existing PW lock.  The
2671          * VFS and page cache already protect us locally, so lots of readers/
2672          * writers can share a single PW lock. */
2673         rc = mode;
2674         if (mode == LCK_PR)
2675                 rc |= LCK_PW;
2676         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2677                              res_id, type, policy, rc, lockh, unref);
2678         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2679                 RETURN(rc);
2680
2681         if (data != NULL) {
2682                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2683
2684                 LASSERT(lock != NULL);
2685                 if (!osc_set_lock_data(lock, data)) {
2686                         ldlm_lock_decref(lockh, rc);
2687                         rc = 0;
2688                 }
2689                 LDLM_LOCK_PUT(lock);
2690         }
2691         RETURN(rc);
2692 }
2693
2694 static int osc_statfs_interpret(const struct lu_env *env,
2695                                 struct ptlrpc_request *req, void *args, int rc)
2696 {
2697         struct osc_async_args *aa = args;
2698         struct obd_statfs *msfs;
2699
2700         ENTRY;
2701         if (rc == -EBADR)
2702                 /*
2703                  * The request has in fact never been sent due to issues at
2704                  * a higher level (LOV).  Exit immediately since the caller
2705                  * is aware of the problem and takes care of the clean up.
2706                  */
2707                 RETURN(rc);
2708
2709         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2710             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2711                 GOTO(out, rc = 0);
2712
2713         if (rc != 0)
2714                 GOTO(out, rc);
2715
2716         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2717         if (msfs == NULL)
2718                 GOTO(out, rc = -EPROTO);
2719
2720         *aa->aa_oi->oi_osfs = *msfs;
2721 out:
2722         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2723
2724         RETURN(rc);
2725 }
2726
2727 static int osc_statfs_async(struct obd_export *exp,
2728                             struct obd_info *oinfo, time64_t max_age,
2729                             struct ptlrpc_request_set *rqset)
2730 {
2731         struct obd_device     *obd = class_exp2obd(exp);
2732         struct ptlrpc_request *req;
2733         struct osc_async_args *aa;
2734         int rc;
2735         ENTRY;
2736
2737         if (obd->obd_osfs_age >= max_age) {
2738                 CDEBUG(D_SUPER,
2739                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2740                        obd->obd_name, &obd->obd_osfs,
2741                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2742                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2743                 spin_lock(&obd->obd_osfs_lock);
2744                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2745                 spin_unlock(&obd->obd_osfs_lock);
2746                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2747                 if (oinfo->oi_cb_up)
2748                         oinfo->oi_cb_up(oinfo, 0);
2749
2750                 RETURN(0);
2751         }
2752
2753         /* We could possibly pass max_age in the request (as an absolute
2754          * timestamp or a "seconds.usec ago") so the target can avoid doing
2755          * extra calls into the filesystem if that isn't necessary (e.g.
2756          * during mount that would help a bit).  Having relative timestamps
2757          * is not so great if request processing is slow, while absolute
2758          * timestamps are not ideal because they need time synchronization. */
2759         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2760         if (req == NULL)
2761                 RETURN(-ENOMEM);
2762
2763         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2764         if (rc) {
2765                 ptlrpc_request_free(req);
2766                 RETURN(rc);
2767         }
2768         ptlrpc_request_set_replen(req);
2769         req->rq_request_portal = OST_CREATE_PORTAL;
2770         ptlrpc_at_set_req_timeout(req);
2771
2772         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2773                 /* procfs requests not want stat in wait for avoid deadlock */
2774                 req->rq_no_resend = 1;
2775                 req->rq_no_delay = 1;
2776         }
2777
2778         req->rq_interpret_reply = osc_statfs_interpret;
2779         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2780         aa = ptlrpc_req_async_args(req);
2781         aa->aa_oi = oinfo;
2782
2783         ptlrpc_set_add_req(rqset, req);
2784         RETURN(0);
2785 }
2786
2787 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2788                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2789 {
2790         struct obd_device     *obd = class_exp2obd(exp);
2791         struct obd_statfs     *msfs;
2792         struct ptlrpc_request *req;
2793         struct obd_import     *imp = NULL;
2794         int rc;
2795         ENTRY;
2796
2797
2798         /*Since the request might also come from lprocfs, so we need
2799          *sync this with client_disconnect_export Bug15684*/
2800         down_read(&obd->u.cli.cl_sem);
2801         if (obd->u.cli.cl_import)
2802                 imp = class_import_get(obd->u.cli.cl_import);
2803         up_read(&obd->u.cli.cl_sem);
2804         if (!imp)
2805                 RETURN(-ENODEV);
2806
2807         /* We could possibly pass max_age in the request (as an absolute
2808          * timestamp or a "seconds.usec ago") so the target can avoid doing
2809          * extra calls into the filesystem if that isn't necessary (e.g.
2810          * during mount that would help a bit).  Having relative timestamps
2811          * is not so great if request processing is slow, while absolute
2812          * timestamps are not ideal because they need time synchronization. */
2813         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2814
2815         class_import_put(imp);
2816
2817         if (req == NULL)
2818                 RETURN(-ENOMEM);
2819
2820         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2821         if (rc) {
2822                 ptlrpc_request_free(req);
2823                 RETURN(rc);
2824         }
2825         ptlrpc_request_set_replen(req);
2826         req->rq_request_portal = OST_CREATE_PORTAL;
2827         ptlrpc_at_set_req_timeout(req);
2828
2829         if (flags & OBD_STATFS_NODELAY) {
2830                 /* procfs requests not want stat in wait for avoid deadlock */
2831                 req->rq_no_resend = 1;
2832                 req->rq_no_delay = 1;
2833         }
2834
2835         rc = ptlrpc_queue_wait(req);
2836         if (rc)
2837                 GOTO(out, rc);
2838
2839         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2840         if (msfs == NULL)
2841                 GOTO(out, rc = -EPROTO);
2842
2843         *osfs = *msfs;
2844
2845         EXIT;
2846 out:
2847         ptlrpc_req_finished(req);
2848         return rc;
2849 }
2850
2851 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2852                          void *karg, void __user *uarg)
2853 {
2854         struct obd_device *obd = exp->exp_obd;
2855         struct obd_ioctl_data *data = karg;
2856         int rc = 0;
2857
2858         ENTRY;
2859         if (!try_module_get(THIS_MODULE)) {
2860                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2861                        module_name(THIS_MODULE));
2862                 return -EINVAL;
2863         }
2864         switch (cmd) {
2865         case OBD_IOC_CLIENT_RECOVER:
2866                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2867                                            data->ioc_inlbuf1, 0);
2868                 if (rc > 0)
2869                         rc = 0;
2870                 break;
2871         case IOC_OSC_SET_ACTIVE:
2872                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2873                                               data->ioc_offset);
2874                 break;
2875         default:
2876                 rc = -ENOTTY;
2877                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2878                        obd->obd_name, cmd, current_comm(), rc);
2879                 break;
2880         }
2881
2882         module_put(THIS_MODULE);
2883         return rc;
2884 }
2885
2886 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2887                        u32 keylen, void *key, u32 vallen, void *val,
2888                        struct ptlrpc_request_set *set)
2889 {
2890         struct ptlrpc_request *req;
2891         struct obd_device     *obd = exp->exp_obd;
2892         struct obd_import     *imp = class_exp2cliimp(exp);
2893         char                  *tmp;
2894         int                    rc;
2895         ENTRY;
2896
2897         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2898
2899         if (KEY_IS(KEY_CHECKSUM)) {
2900                 if (vallen != sizeof(int))
2901                         RETURN(-EINVAL);
2902                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2903                 RETURN(0);
2904         }
2905
2906         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2907                 sptlrpc_conf_client_adapt(obd);
2908                 RETURN(0);
2909         }
2910
2911         if (KEY_IS(KEY_FLUSH_CTX)) {
2912                 sptlrpc_import_flush_my_ctx(imp);
2913                 RETURN(0);
2914         }
2915
2916         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2917                 struct client_obd *cli = &obd->u.cli;
2918                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2919                 long target = *(long *)val;
2920
2921                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2922                 *(long *)val -= nr;
2923                 RETURN(0);
2924         }
2925
2926         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2927                 RETURN(-EINVAL);
2928
2929         /* We pass all other commands directly to OST. Since nobody calls osc
2930            methods directly and everybody is supposed to go through LOV, we
2931            assume lov checked invalid values for us.
2932            The only recognised values so far are evict_by_nid and mds_conn.
2933            Even if something bad goes through, we'd get a -EINVAL from OST
2934            anyway. */
2935
2936         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2937                                                 &RQF_OST_SET_GRANT_INFO :
2938                                                 &RQF_OBD_SET_INFO);
2939         if (req == NULL)
2940                 RETURN(-ENOMEM);
2941
2942         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2943                              RCL_CLIENT, keylen);
2944         if (!KEY_IS(KEY_GRANT_SHRINK))
2945                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2946                                      RCL_CLIENT, vallen);
2947         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2948         if (rc) {
2949                 ptlrpc_request_free(req);
2950                 RETURN(rc);
2951         }
2952
2953         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2954         memcpy(tmp, key, keylen);
2955         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2956                                                         &RMF_OST_BODY :
2957                                                         &RMF_SETINFO_VAL);
2958         memcpy(tmp, val, vallen);
2959
2960         if (KEY_IS(KEY_GRANT_SHRINK)) {
2961                 struct osc_grant_args *aa;
2962                 struct obdo *oa;
2963
2964                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2965                 aa = ptlrpc_req_async_args(req);
2966                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2967                 if (!oa) {
2968                         ptlrpc_req_finished(req);
2969                         RETURN(-ENOMEM);
2970                 }
2971                 *oa = ((struct ost_body *)val)->oa;
2972                 aa->aa_oa = oa;
2973                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2974         }
2975
2976         ptlrpc_request_set_replen(req);
2977         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2978                 LASSERT(set != NULL);
2979                 ptlrpc_set_add_req(set, req);
2980                 ptlrpc_check_set(NULL, set);
2981         } else {
2982                 ptlrpcd_add_req(req);
2983         }
2984
2985         RETURN(0);
2986 }
2987 EXPORT_SYMBOL(osc_set_info_async);
2988
2989 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2990                   struct obd_device *obd, struct obd_uuid *cluuid,
2991                   struct obd_connect_data *data, void *localdata)
2992 {
2993         struct client_obd *cli = &obd->u.cli;
2994
2995         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2996                 long lost_grant;
2997                 long grant;
2998
2999                 spin_lock(&cli->cl_loi_list_lock);
3000                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3001                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3002                         /* restore ocd_grant_blkbits as client page bits */
3003                         data->ocd_grant_blkbits = PAGE_SHIFT;
3004                         grant += cli->cl_dirty_grant;
3005                 } else {
3006                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3007                 }
3008                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3009                 lost_grant = cli->cl_lost_grant;
3010                 cli->cl_lost_grant = 0;
3011                 spin_unlock(&cli->cl_loi_list_lock);
3012
3013                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3014                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3015                        data->ocd_version, data->ocd_grant, lost_grant);
3016         }
3017
3018         RETURN(0);
3019 }
3020 EXPORT_SYMBOL(osc_reconnect);
3021
3022 int osc_disconnect(struct obd_export *exp)
3023 {
3024         struct obd_device *obd = class_exp2obd(exp);
3025         int rc;
3026
3027         rc = client_disconnect_export(exp);
3028         /**
3029          * Initially we put del_shrink_grant before disconnect_export, but it
3030          * causes the following problem if setup (connect) and cleanup
3031          * (disconnect) are tangled together.
3032          *      connect p1                     disconnect p2
3033          *   ptlrpc_connect_import
3034          *     ...............               class_manual_cleanup
3035          *                                     osc_disconnect
3036          *                                     del_shrink_grant
3037          *   ptlrpc_connect_interrupt
3038          *     osc_init_grant
3039          *   add this client to shrink list
3040          *                                      cleanup_osc
3041          * Bang! grant shrink thread trigger the shrink. BUG18662
3042          */
3043         osc_del_grant_list(&obd->u.cli);
3044         return rc;
3045 }
3046 EXPORT_SYMBOL(osc_disconnect);
3047
3048 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3049                                  struct hlist_node *hnode, void *arg)
3050 {
3051         struct lu_env *env = arg;
3052         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3053         struct ldlm_lock *lock;
3054         struct osc_object *osc = NULL;
3055         ENTRY;
3056
3057         lock_res(res);
3058         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3059                 if (lock->l_ast_data != NULL && osc == NULL) {
3060                         osc = lock->l_ast_data;
3061                         cl_object_get(osc2cl(osc));
3062                 }
3063
3064                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3065                  * by the 2nd round of ldlm_namespace_clean() call in
3066                  * osc_import_event(). */
3067                 ldlm_clear_cleaned(lock);
3068         }
3069         unlock_res(res);
3070
3071         if (osc != NULL) {
3072                 osc_object_invalidate(env, osc);
3073                 cl_object_put(env, osc2cl(osc));
3074         }
3075
3076         RETURN(0);
3077 }
3078 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3079
3080 static int osc_import_event(struct obd_device *obd,
3081                             struct obd_import *imp,
3082                             enum obd_import_event event)
3083 {
3084         struct client_obd *cli;
3085         int rc = 0;
3086
3087         ENTRY;
3088         LASSERT(imp->imp_obd == obd);
3089
3090         switch (event) {
3091         case IMP_EVENT_DISCON: {
3092                 cli = &obd->u.cli;
3093                 spin_lock(&cli->cl_loi_list_lock);
3094                 cli->cl_avail_grant = 0;
3095                 cli->cl_lost_grant = 0;
3096                 spin_unlock(&cli->cl_loi_list_lock);
3097                 break;
3098         }
3099         case IMP_EVENT_INACTIVE: {
3100                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3101                 break;
3102         }
3103         case IMP_EVENT_INVALIDATE: {
3104                 struct ldlm_namespace *ns = obd->obd_namespace;
3105                 struct lu_env         *env;
3106                 __u16                  refcheck;
3107
3108                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3109
3110                 env = cl_env_get(&refcheck);
3111                 if (!IS_ERR(env)) {
3112                         osc_io_unplug(env, &obd->u.cli, NULL);
3113
3114                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3115                                                  osc_ldlm_resource_invalidate,
3116                                                  env, 0);
3117                         cl_env_put(env, &refcheck);
3118
3119                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3120                 } else
3121                         rc = PTR_ERR(env);
3122                 break;
3123         }
3124         case IMP_EVENT_ACTIVE: {
3125                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3126                 break;
3127         }
3128         case IMP_EVENT_OCD: {
3129                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3130
3131                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3132                         osc_init_grant(&obd->u.cli, ocd);
3133
3134                 /* See bug 7198 */
3135                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3136                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3137
3138                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3139                 break;
3140         }
3141         case IMP_EVENT_DEACTIVATE: {
3142                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3143                 break;
3144         }
3145         case IMP_EVENT_ACTIVATE: {
3146                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3147                 break;
3148         }
3149         default:
3150                 CERROR("Unknown import event %d\n", event);
3151                 LBUG();
3152         }
3153         RETURN(rc);
3154 }