Whamcloud - gitweb
LU-9679 osc: centralize handling of PTLRPCD_SET
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 ptlrpc_set_add_req(rqset, req);
240         }
241
242         RETURN(0);
243 }
244
245 static int osc_ladvise_interpret(const struct lu_env *env,
246                                  struct ptlrpc_request *req,
247                                  void *arg, int rc)
248 {
249         struct osc_ladvise_args *la = arg;
250         struct ost_body *body;
251         ENTRY;
252
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
257         if (body == NULL)
258                 GOTO(out, rc = -EPROTO);
259
260         *la->la_oa = body->oa;
261 out:
262         rc = la->la_upcall(la->la_cookie, rc);
263         RETURN(rc);
264 }
265
266 /**
267  * If rqset is NULL, do not wait for response. Upcall and cookie could also
268  * be NULL in this case
269  */
270 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
271                      struct ladvise_hdr *ladvise_hdr,
272                      obd_enqueue_update_f upcall, void *cookie,
273                      struct ptlrpc_request_set *rqset)
274 {
275         struct ptlrpc_request   *req;
276         struct ost_body         *body;
277         struct osc_ladvise_args *la;
278         int                      rc;
279         struct lu_ladvise       *req_ladvise;
280         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
281         int                      num_advise = ladvise_hdr->lah_count;
282         struct ladvise_hdr      *req_ladvise_hdr;
283         ENTRY;
284
285         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
286         if (req == NULL)
287                 RETURN(-ENOMEM);
288
289         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
290                              num_advise * sizeof(*ladvise));
291         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
292         if (rc != 0) {
293                 ptlrpc_request_free(req);
294                 RETURN(rc);
295         }
296         req->rq_request_portal = OST_IO_PORTAL;
297         ptlrpc_at_set_req_timeout(req);
298
299         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
300         LASSERT(body);
301         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
302                              oa);
303
304         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
305                                                  &RMF_OST_LADVISE_HDR);
306         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
307
308         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
309         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
310         ptlrpc_request_set_replen(req);
311
312         if (rqset == NULL) {
313                 /* Do not wait for response. */
314                 ptlrpcd_add_req(req);
315                 RETURN(0);
316         }
317
318         req->rq_interpret_reply = osc_ladvise_interpret;
319         la = ptlrpc_req_async_args(la, req);
320         la->la_oa = oa;
321         la->la_upcall = upcall;
322         la->la_cookie = cookie;
323
324         ptlrpc_set_add_req(rqset, req);
325
326         RETURN(0);
327 }
328
329 static int osc_create(const struct lu_env *env, struct obd_export *exp,
330                       struct obdo *oa)
331 {
332         struct ptlrpc_request *req;
333         struct ost_body       *body;
334         int                    rc;
335         ENTRY;
336
337         LASSERT(oa != NULL);
338         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
339         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
340
341         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
342         if (req == NULL)
343                 GOTO(out, rc = -ENOMEM);
344
345         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
346         if (rc) {
347                 ptlrpc_request_free(req);
348                 GOTO(out, rc);
349         }
350
351         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
352         LASSERT(body);
353
354         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
355
356         ptlrpc_request_set_replen(req);
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         CDEBUG(D_HA, "transno: %lld\n",
373                lustre_msg_get_transno(req->rq_repmsg));
374 out_req:
375         ptlrpc_req_finished(req);
376 out:
377         RETURN(rc);
378 }
379
380 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
381                    obd_enqueue_update_f upcall, void *cookie)
382 {
383         struct ptlrpc_request *req;
384         struct osc_setattr_args *sa;
385         struct obd_import *imp = class_exp2cliimp(exp);
386         struct ost_body *body;
387         int rc;
388
389         ENTRY;
390
391         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
396         if (rc < 0) {
397                 ptlrpc_request_free(req);
398                 RETURN(rc);
399         }
400
401         osc_set_io_portal(req);
402
403         ptlrpc_at_set_req_timeout(req);
404
405         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
406
407         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
408
409         ptlrpc_request_set_replen(req);
410
411         req->rq_interpret_reply = osc_setattr_interpret;
412         sa = ptlrpc_req_async_args(sa, req);
413         sa->sa_oa = oa;
414         sa->sa_upcall = upcall;
415         sa->sa_cookie = cookie;
416
417         ptlrpcd_add_req(req);
418
419         RETURN(0);
420 }
421 EXPORT_SYMBOL(osc_punch_send);
422
423 static int osc_sync_interpret(const struct lu_env *env,
424                               struct ptlrpc_request *req, void *args, int rc)
425 {
426         struct osc_fsync_args *fa = args;
427         struct ost_body *body;
428         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
429         unsigned long valid = 0;
430         struct cl_object *obj;
431         ENTRY;
432
433         if (rc != 0)
434                 GOTO(out, rc);
435
436         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
437         if (body == NULL) {
438                 CERROR("can't unpack ost_body\n");
439                 GOTO(out, rc = -EPROTO);
440         }
441
442         *fa->fa_oa = body->oa;
443         obj = osc2cl(fa->fa_obj);
444
445         /* Update osc object's blocks attribute */
446         cl_object_attr_lock(obj);
447         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
448                 attr->cat_blocks = body->oa.o_blocks;
449                 valid |= CAT_BLOCKS;
450         }
451
452         if (valid != 0)
453                 cl_object_attr_update(env, obj, attr, valid);
454         cl_object_attr_unlock(obj);
455
456 out:
457         rc = fa->fa_upcall(fa->fa_cookie, rc);
458         RETURN(rc);
459 }
460
461 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
462                   obd_enqueue_update_f upcall, void *cookie,
463                   struct ptlrpc_request_set *rqset)
464 {
465         struct obd_export     *exp = osc_export(obj);
466         struct ptlrpc_request *req;
467         struct ost_body       *body;
468         struct osc_fsync_args *fa;
469         int                    rc;
470         ENTRY;
471
472         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
473         if (req == NULL)
474                 RETURN(-ENOMEM);
475
476         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 RETURN(rc);
480         }
481
482         /* overload the size and blocks fields in the oa with start/end */
483         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
484         LASSERT(body);
485         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
486
487         ptlrpc_request_set_replen(req);
488         req->rq_interpret_reply = osc_sync_interpret;
489
490         fa = ptlrpc_req_async_args(fa, req);
491         fa->fa_obj = obj;
492         fa->fa_oa = oa;
493         fa->fa_upcall = upcall;
494         fa->fa_cookie = cookie;
495
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN (0);
499 }
500
501 /* Find and cancel locally locks matched by @mode in the resource found by
502  * @objid. Found locks are added into @cancel list. Returns the amount of
503  * locks added to @cancels list. */
504 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
505                                    struct list_head *cancels,
506                                    enum ldlm_mode mode, __u64 lock_flags)
507 {
508         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
509         struct ldlm_res_id res_id;
510         struct ldlm_resource *res;
511         int count;
512         ENTRY;
513
514         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
515          * export) but disabled through procfs (flag in NS).
516          *
517          * This distinguishes from a case when ELC is not supported originally,
518          * when we still want to cancel locks in advance and just cancel them
519          * locally, without sending any RPC. */
520         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
521                 RETURN(0);
522
523         ostid_build_res_name(&oa->o_oi, &res_id);
524         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
525         if (IS_ERR(res))
526                 RETURN(0);
527
528         LDLM_RESOURCE_ADDREF(res);
529         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
530                                            lock_flags, 0, NULL);
531         LDLM_RESOURCE_DELREF(res);
532         ldlm_resource_putref(res);
533         RETURN(count);
534 }
535
536 static int osc_destroy_interpret(const struct lu_env *env,
537                                  struct ptlrpc_request *req, void *args, int rc)
538 {
539         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
540
541         atomic_dec(&cli->cl_destroy_in_flight);
542         wake_up(&cli->cl_destroy_waitq);
543
544         return 0;
545 }
546
547 static int osc_can_send_destroy(struct client_obd *cli)
548 {
549         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
550             cli->cl_max_rpcs_in_flight) {
551                 /* The destroy request can be sent */
552                 return 1;
553         }
554         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
555             cli->cl_max_rpcs_in_flight) {
556                 /*
557                  * The counter has been modified between the two atomic
558                  * operations.
559                  */
560                 wake_up(&cli->cl_destroy_waitq);
561         }
562         return 0;
563 }
564
565 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
566                        struct obdo *oa)
567 {
568         struct client_obd     *cli = &exp->exp_obd->u.cli;
569         struct ptlrpc_request *req;
570         struct ost_body       *body;
571         LIST_HEAD(cancels);
572         int rc, count;
573         ENTRY;
574
575         if (!oa) {
576                 CDEBUG(D_INFO, "oa NULL\n");
577                 RETURN(-EINVAL);
578         }
579
580         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
581                                         LDLM_FL_DISCARD_DATA);
582
583         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
584         if (req == NULL) {
585                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
586                 RETURN(-ENOMEM);
587         }
588
589         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
590                                0, &cancels, count);
591         if (rc) {
592                 ptlrpc_request_free(req);
593                 RETURN(rc);
594         }
595
596         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
597         ptlrpc_at_set_req_timeout(req);
598
599         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
600         LASSERT(body);
601         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
602
603         ptlrpc_request_set_replen(req);
604
605         req->rq_interpret_reply = osc_destroy_interpret;
606         if (!osc_can_send_destroy(cli)) {
607                 /*
608                  * Wait until the number of on-going destroy RPCs drops
609                  * under max_rpc_in_flight
610                  */
611                 rc = l_wait_event_abortable_exclusive(
612                         cli->cl_destroy_waitq,
613                         osc_can_send_destroy(cli));
614                 if (rc) {
615                         ptlrpc_req_finished(req);
616                         RETURN(-EINTR);
617                 }
618         }
619
620         /* Do not wait for response */
621         ptlrpcd_add_req(req);
622         RETURN(0);
623 }
624
625 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
626                                 long writing_bytes)
627 {
628         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
629
630         LASSERT(!(oa->o_valid & bits));
631
632         oa->o_valid |= bits;
633         spin_lock(&cli->cl_loi_list_lock);
634         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
635                 oa->o_dirty = cli->cl_dirty_grant;
636         else
637                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
638         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty_pages,
641                        cli->cl_dirty_max_pages);
642                 oa->o_undirty = 0;
643         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
644                             (long)(obd_max_dirty_pages + 1))) {
645                 /* The atomic_read() allowing the atomic_inc() are
646                  * not covered by a lock thus they may safely race and trip
647                  * this CERROR() unless we add in a small fudge factor (+1). */
648                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
649                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
650                        obd_max_dirty_pages);
651                 oa->o_undirty = 0;
652         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
653                             0x7fffffff)) {
654                 CERROR("dirty %lu - dirty_max %lu too big???\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
656                 oa->o_undirty = 0;
657         } else {
658                 unsigned long nrpages;
659                 unsigned long undirty;
660
661                 nrpages = cli->cl_max_pages_per_rpc;
662                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
663                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
664                 undirty = nrpages << PAGE_SHIFT;
665                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
666                                  GRANT_PARAM)) {
667                         int nrextents;
668
669                         /* take extent tax into account when asking for more
670                          * grant space */
671                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
672                                      cli->cl_max_extent_pages;
673                         undirty += nrextents * cli->cl_grant_extent_tax;
674                 }
675                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
676                  * to add extent tax, etc.
677                  */
678                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
679                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
680         }
681         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         spin_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 void osc_update_next_shrink(struct client_obd *cli)
690 {
691         cli->cl_next_shrink_grant = ktime_get_seconds() +
692                                     cli->cl_grant_shrink_interval;
693
694         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
695                cli->cl_next_shrink_grant);
696 }
697
698 static void __osc_update_grant(struct client_obd *cli, u64 grant)
699 {
700         spin_lock(&cli->cl_loi_list_lock);
701         cli->cl_avail_grant += grant;
702         spin_unlock(&cli->cl_loi_list_lock);
703 }
704
705 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
706 {
707         if (body->oa.o_valid & OBD_MD_FLGRANT) {
708                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
709                 __osc_update_grant(cli, body->oa.o_grant);
710         }
711 }
712
713 /**
714  * grant thread data for shrinking space.
715  */
716 struct grant_thread_data {
717         struct list_head        gtd_clients;
718         struct mutex            gtd_mutex;
719         unsigned long           gtd_stopped:1;
720 };
721 static struct grant_thread_data client_gtd;
722
723 static int osc_shrink_grant_interpret(const struct lu_env *env,
724                                       struct ptlrpc_request *req,
725                                       void *args, int rc)
726 {
727         struct osc_grant_args *aa = args;
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct ost_body *body;
730
731         if (rc != 0) {
732                 __osc_update_grant(cli, aa->aa_oa->o_grant);
733                 GOTO(out, rc);
734         }
735
736         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
737         LASSERT(body);
738         osc_update_grant(cli, body);
739 out:
740         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
741         aa->aa_oa = NULL;
742
743         return rc;
744 }
745
746 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
747 {
748         spin_lock(&cli->cl_loi_list_lock);
749         oa->o_grant = cli->cl_avail_grant / 4;
750         cli->cl_avail_grant -= oa->o_grant;
751         spin_unlock(&cli->cl_loi_list_lock);
752         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
753                 oa->o_valid |= OBD_MD_FLFLAGS;
754                 oa->o_flags = 0;
755         }
756         oa->o_flags |= OBD_FL_SHRINK_GRANT;
757         osc_update_next_shrink(cli);
758 }
759
760 /* Shrink the current grant, either from some large amount to enough for a
761  * full set of in-flight RPCs, or if we have already shrunk to that limit
762  * then to enough for a single RPC.  This avoids keeping more grant than
763  * needed, and avoids shrinking the grant piecemeal. */
764 static int osc_shrink_grant(struct client_obd *cli)
765 {
766         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
767                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
768
769         spin_lock(&cli->cl_loi_list_lock);
770         if (cli->cl_avail_grant <= target_bytes)
771                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
772         spin_unlock(&cli->cl_loi_list_lock);
773
774         return osc_shrink_grant_to_target(cli, target_bytes);
775 }
776
777 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
778 {
779         int                     rc = 0;
780         struct ost_body        *body;
781         ENTRY;
782
783         spin_lock(&cli->cl_loi_list_lock);
784         /* Don't shrink if we are already above or below the desired limit
785          * We don't want to shrink below a single RPC, as that will negatively
786          * impact block allocation and long-term performance. */
787         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
788                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
789
790         if (target_bytes >= cli->cl_avail_grant) {
791                 spin_unlock(&cli->cl_loi_list_lock);
792                 RETURN(0);
793         }
794         spin_unlock(&cli->cl_loi_list_lock);
795
796         OBD_ALLOC_PTR(body);
797         if (!body)
798                 RETURN(-ENOMEM);
799
800         osc_announce_cached(cli, &body->oa, 0);
801
802         spin_lock(&cli->cl_loi_list_lock);
803         if (target_bytes >= cli->cl_avail_grant) {
804                 /* available grant has changed since target calculation */
805                 spin_unlock(&cli->cl_loi_list_lock);
806                 GOTO(out_free, rc = 0);
807         }
808         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
809         cli->cl_avail_grant = target_bytes;
810         spin_unlock(&cli->cl_loi_list_lock);
811         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
812                 body->oa.o_valid |= OBD_MD_FLFLAGS;
813                 body->oa.o_flags = 0;
814         }
815         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
816         osc_update_next_shrink(cli);
817
818         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
819                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
820                                 sizeof(*body), body, NULL);
821         if (rc != 0)
822                 __osc_update_grant(cli, body->oa.o_grant);
823 out_free:
824         OBD_FREE_PTR(body);
825         RETURN(rc);
826 }
827
828 static int osc_should_shrink_grant(struct client_obd *client)
829 {
830         time64_t next_shrink = client->cl_next_shrink_grant;
831
832         if (client->cl_import == NULL)
833                 return 0;
834
835         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
836             client->cl_import->imp_grant_shrink_disabled) {
837                 osc_update_next_shrink(client);
838                 return 0;
839         }
840
841         if (ktime_get_seconds() >= next_shrink - 5) {
842                 /* Get the current RPC size directly, instead of going via:
843                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844                  * Keep comment here so that it can be found by searching. */
845                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
846
847                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848                     client->cl_avail_grant > brw_size)
849                         return 1;
850                 else
851                         osc_update_next_shrink(client);
852         }
853         return 0;
854 }
855
856 #define GRANT_SHRINK_RPC_BATCH  100
857
858 static struct delayed_work work;
859
860 static void osc_grant_work_handler(struct work_struct *data)
861 {
862         struct client_obd *cli;
863         int rpc_sent;
864         bool init_next_shrink = true;
865         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
866
867         rpc_sent = 0;
868         mutex_lock(&client_gtd.gtd_mutex);
869         list_for_each_entry(cli, &client_gtd.gtd_clients,
870                             cl_grant_chain) {
871                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
872                     osc_should_shrink_grant(cli)) {
873                         osc_shrink_grant(cli);
874                         rpc_sent++;
875                 }
876
877                 if (!init_next_shrink) {
878                         if (cli->cl_next_shrink_grant < next_shrink &&
879                             cli->cl_next_shrink_grant > ktime_get_seconds())
880                                 next_shrink = cli->cl_next_shrink_grant;
881                 } else {
882                         init_next_shrink = false;
883                         next_shrink = cli->cl_next_shrink_grant;
884                 }
885         }
886         mutex_unlock(&client_gtd.gtd_mutex);
887
888         if (client_gtd.gtd_stopped == 1)
889                 return;
890
891         if (next_shrink > ktime_get_seconds()) {
892                 time64_t delay = next_shrink - ktime_get_seconds();
893
894                 schedule_delayed_work(&work, cfs_time_seconds(delay));
895         } else {
896                 schedule_work(&work.work);
897         }
898 }
899
900 void osc_schedule_grant_work(void)
901 {
902         cancel_delayed_work_sync(&work);
903         schedule_work(&work.work);
904 }
905
906 /**
907  * Start grant thread for returing grant to server for idle clients.
908  */
909 static int osc_start_grant_work(void)
910 {
911         client_gtd.gtd_stopped = 0;
912         mutex_init(&client_gtd.gtd_mutex);
913         INIT_LIST_HEAD(&client_gtd.gtd_clients);
914
915         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
916         schedule_work(&work.work);
917
918         return 0;
919 }
920
921 static void osc_stop_grant_work(void)
922 {
923         client_gtd.gtd_stopped = 1;
924         cancel_delayed_work_sync(&work);
925 }
926
927 static void osc_add_grant_list(struct client_obd *client)
928 {
929         mutex_lock(&client_gtd.gtd_mutex);
930         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
931         mutex_unlock(&client_gtd.gtd_mutex);
932 }
933
934 static void osc_del_grant_list(struct client_obd *client)
935 {
936         if (list_empty(&client->cl_grant_chain))
937                 return;
938
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_del_init(&client->cl_grant_chain);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
945 {
946         /*
947          * ocd_grant is the total grant amount we're expect to hold: if we've
948          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
949          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
950          * dirty.
951          *
952          * race is tolerable here: if we're evicted, but imp_state already
953          * left EVICTED state, then cl_dirty_pages must be 0 already.
954          */
955         spin_lock(&cli->cl_loi_list_lock);
956         cli->cl_avail_grant = ocd->ocd_grant;
957         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
958                 cli->cl_avail_grant -= cli->cl_reserved_grant;
959                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
960                         cli->cl_avail_grant -= cli->cl_dirty_grant;
961                 else
962                         cli->cl_avail_grant -=
963                                         cli->cl_dirty_pages << PAGE_SHIFT;
964         }
965
966         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
967                 u64 size;
968                 int chunk_mask;
969
970                 /* overhead for each extent insertion */
971                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
972                 /* determine the appropriate chunk size used by osc_extent. */
973                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
974                                           ocd->ocd_grant_blkbits);
975                 /* max_pages_per_rpc must be chunk aligned */
976                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
977                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
978                                              ~chunk_mask) & chunk_mask;
979                 /* determine maximum extent size, in #pages */
980                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
981                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
982                 if (cli->cl_max_extent_pages == 0)
983                         cli->cl_max_extent_pages = 1;
984         } else {
985                 cli->cl_grant_extent_tax = 0;
986                 cli->cl_chunkbits = PAGE_SHIFT;
987                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
988         }
989         spin_unlock(&cli->cl_loi_list_lock);
990
991         CDEBUG(D_CACHE,
992                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
993                cli_name(cli),
994                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
995                cli->cl_max_extent_pages);
996
997         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
998                 osc_add_grant_list(cli);
999 }
1000 EXPORT_SYMBOL(osc_init_grant);
1001
1002 /* We assume that the reason this OSC got a short read is because it read
1003  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1004  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1005  * this stripe never got written at or beyond this stripe offset yet. */
1006 static void handle_short_read(int nob_read, size_t page_count,
1007                               struct brw_page **pga)
1008 {
1009         char *ptr;
1010         int i = 0;
1011
1012         /* skip bytes read OK */
1013         while (nob_read > 0) {
1014                 LASSERT (page_count > 0);
1015
1016                 if (pga[i]->count > nob_read) {
1017                         /* EOF inside this page */
1018                         ptr = kmap(pga[i]->pg) +
1019                                 (pga[i]->off & ~PAGE_MASK);
1020                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1021                         kunmap(pga[i]->pg);
1022                         page_count--;
1023                         i++;
1024                         break;
1025                 }
1026
1027                 nob_read -= pga[i]->count;
1028                 page_count--;
1029                 i++;
1030         }
1031
1032         /* zero remaining pages */
1033         while (page_count-- > 0) {
1034                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1035                 memset(ptr, 0, pga[i]->count);
1036                 kunmap(pga[i]->pg);
1037                 i++;
1038         }
1039 }
1040
1041 static int check_write_rcs(struct ptlrpc_request *req,
1042                            int requested_nob, int niocount,
1043                            size_t page_count, struct brw_page **pga)
1044 {
1045         int     i;
1046         __u32   *remote_rcs;
1047
1048         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1049                                                   sizeof(*remote_rcs) *
1050                                                   niocount);
1051         if (remote_rcs == NULL) {
1052                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1053                 return(-EPROTO);
1054         }
1055
1056         /* return error if any niobuf was in error */
1057         for (i = 0; i < niocount; i++) {
1058                 if ((int)remote_rcs[i] < 0) {
1059                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1060                                i, remote_rcs[i], req);
1061                         return remote_rcs[i];
1062                 }
1063
1064                 if (remote_rcs[i] != 0) {
1065                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1066                                 i, remote_rcs[i], req);
1067                         return(-EPROTO);
1068                 }
1069         }
1070         if (req->rq_bulk != NULL &&
1071             req->rq_bulk->bd_nob_transferred != requested_nob) {
1072                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1073                        req->rq_bulk->bd_nob_transferred, requested_nob);
1074                 return(-EPROTO);
1075         }
1076
1077         return (0);
1078 }
1079
1080 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1081 {
1082         if (p1->flag != p2->flag) {
1083                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1084                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1085                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1086
1087                 /* warn if we try to combine flags that we don't know to be
1088                  * safe to combine */
1089                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1090                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1091                               "report this at https://jira.whamcloud.com/\n",
1092                               p1->flag, p2->flag);
1093                 }
1094                 return 0;
1095         }
1096
1097         return (p1->off + p1->count == p2->off);
1098 }
1099
1100 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1101 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1102                                    size_t pg_count, struct brw_page **pga,
1103                                    int opc, obd_dif_csum_fn *fn,
1104                                    int sector_size,
1105                                    u32 *check_sum)
1106 {
1107         struct ahash_request *req;
1108         /* Used Adler as the default checksum type on top of DIF tags */
1109         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1110         struct page *__page;
1111         unsigned char *buffer;
1112         __u16 *guard_start;
1113         unsigned int bufsize;
1114         int guard_number;
1115         int used_number = 0;
1116         int used;
1117         u32 cksum;
1118         int rc = 0;
1119         int i = 0;
1120
1121         LASSERT(pg_count > 0);
1122
1123         __page = alloc_page(GFP_KERNEL);
1124         if (__page == NULL)
1125                 return -ENOMEM;
1126
1127         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1128         if (IS_ERR(req)) {
1129                 rc = PTR_ERR(req);
1130                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1131                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1132                 GOTO(out, rc);
1133         }
1134
1135         buffer = kmap(__page);
1136         guard_start = (__u16 *)buffer;
1137         guard_number = PAGE_SIZE / sizeof(*guard_start);
1138         while (nob > 0 && pg_count > 0) {
1139                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1140
1141                 /* corrupt the data before we compute the checksum, to
1142                  * simulate an OST->client data error */
1143                 if (unlikely(i == 0 && opc == OST_READ &&
1144                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1145                         unsigned char *ptr = kmap(pga[i]->pg);
1146                         int off = pga[i]->off & ~PAGE_MASK;
1147
1148                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1149                         kunmap(pga[i]->pg);
1150                 }
1151
1152                 /*
1153                  * The left guard number should be able to hold checksums of a
1154                  * whole page
1155                  */
1156                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1157                                                   pga[i]->off & ~PAGE_MASK,
1158                                                   count,
1159                                                   guard_start + used_number,
1160                                                   guard_number - used_number,
1161                                                   &used, sector_size,
1162                                                   fn);
1163                 if (rc)
1164                         break;
1165
1166                 used_number += used;
1167                 if (used_number == guard_number) {
1168                         cfs_crypto_hash_update_page(req, __page, 0,
1169                                 used_number * sizeof(*guard_start));
1170                         used_number = 0;
1171                 }
1172
1173                 nob -= pga[i]->count;
1174                 pg_count--;
1175                 i++;
1176         }
1177         kunmap(__page);
1178         if (rc)
1179                 GOTO(out, rc);
1180
1181         if (used_number != 0)
1182                 cfs_crypto_hash_update_page(req, __page, 0,
1183                         used_number * sizeof(*guard_start));
1184
1185         bufsize = sizeof(cksum);
1186         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1187
1188         /* For sending we only compute the wrong checksum instead
1189          * of corrupting the data so it is still correct on a redo */
1190         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1191                 cksum++;
1192
1193         *check_sum = cksum;
1194 out:
1195         __free_page(__page);
1196         return rc;
1197 }
1198 #else /* !CONFIG_CRC_T10DIF */
1199 #define obd_dif_ip_fn NULL
1200 #define obd_dif_crc_fn NULL
1201 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1202         -EOPNOTSUPP
1203 #endif /* CONFIG_CRC_T10DIF */
1204
1205 static int osc_checksum_bulk(int nob, size_t pg_count,
1206                              struct brw_page **pga, int opc,
1207                              enum cksum_types cksum_type,
1208                              u32 *cksum)
1209 {
1210         int                             i = 0;
1211         struct ahash_request           *req;
1212         unsigned int                    bufsize;
1213         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1214
1215         LASSERT(pg_count > 0);
1216
1217         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1218         if (IS_ERR(req)) {
1219                 CERROR("Unable to initialize checksum hash %s\n",
1220                        cfs_crypto_hash_name(cfs_alg));
1221                 return PTR_ERR(req);
1222         }
1223
1224         while (nob > 0 && pg_count > 0) {
1225                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1226
1227                 /* corrupt the data before we compute the checksum, to
1228                  * simulate an OST->client data error */
1229                 if (i == 0 && opc == OST_READ &&
1230                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1231                         unsigned char *ptr = kmap(pga[i]->pg);
1232                         int off = pga[i]->off & ~PAGE_MASK;
1233
1234                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1235                         kunmap(pga[i]->pg);
1236                 }
1237                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1238                                             pga[i]->off & ~PAGE_MASK,
1239                                             count);
1240                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1241                                (int)(pga[i]->off & ~PAGE_MASK));
1242
1243                 nob -= pga[i]->count;
1244                 pg_count--;
1245                 i++;
1246         }
1247
1248         bufsize = sizeof(*cksum);
1249         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1250
1251         /* For sending we only compute the wrong checksum instead
1252          * of corrupting the data so it is still correct on a redo */
1253         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1254                 (*cksum)++;
1255
1256         return 0;
1257 }
1258
1259 static int osc_checksum_bulk_rw(const char *obd_name,
1260                                 enum cksum_types cksum_type,
1261                                 int nob, size_t pg_count,
1262                                 struct brw_page **pga, int opc,
1263                                 u32 *check_sum)
1264 {
1265         obd_dif_csum_fn *fn = NULL;
1266         int sector_size = 0;
1267         int rc;
1268
1269         ENTRY;
1270         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1271
1272         if (fn)
1273                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1274                                              opc, fn, sector_size, check_sum);
1275         else
1276                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1277                                        check_sum);
1278
1279         RETURN(rc);
1280 }
1281
1282 static int
1283 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1284                      u32 page_count, struct brw_page **pga,
1285                      struct ptlrpc_request **reqp, int resend)
1286 {
1287         struct ptlrpc_request   *req;
1288         struct ptlrpc_bulk_desc *desc;
1289         struct ost_body         *body;
1290         struct obd_ioobj        *ioobj;
1291         struct niobuf_remote    *niobuf;
1292         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1293         struct osc_brw_async_args *aa;
1294         struct req_capsule      *pill;
1295         struct brw_page *pg_prev;
1296         void *short_io_buf;
1297         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1298
1299         ENTRY;
1300         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301                 RETURN(-ENOMEM); /* Recoverable */
1302         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303                 RETURN(-EINVAL); /* Fatal */
1304
1305         if ((cmd & OBD_BRW_WRITE) != 0) {
1306                 opc = OST_WRITE;
1307                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1308                                                 osc_rq_pool,
1309                                                 &RQF_OST_BRW_WRITE);
1310         } else {
1311                 opc = OST_READ;
1312                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1313         }
1314         if (req == NULL)
1315                 RETURN(-ENOMEM);
1316
1317         for (niocount = i = 1; i < page_count; i++) {
1318                 if (!can_merge_pages(pga[i - 1], pga[i]))
1319                         niocount++;
1320         }
1321
1322         pill = &req->rq_pill;
1323         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1324                              sizeof(*ioobj));
1325         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326                              niocount * sizeof(*niobuf));
1327
1328         for (i = 0; i < page_count; i++)
1329                 short_io_size += pga[i]->count;
1330
1331         /* Check if read/write is small enough to be a short io. */
1332         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1333             !imp_connect_shortio(cli->cl_import))
1334                 short_io_size = 0;
1335
1336         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1337                              opc == OST_READ ? 0 : short_io_size);
1338         if (opc == OST_READ)
1339                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1340                                      short_io_size);
1341
1342         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1343         if (rc) {
1344                 ptlrpc_request_free(req);
1345                 RETURN(rc);
1346         }
1347         osc_set_io_portal(req);
1348
1349         ptlrpc_at_set_req_timeout(req);
1350         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1351          * retry logic */
1352         req->rq_no_retry_einprogress = 1;
1353
1354         if (short_io_size != 0) {
1355                 desc = NULL;
1356                 short_io_buf = NULL;
1357                 goto no_bulk;
1358         }
1359
1360         desc = ptlrpc_prep_bulk_imp(req, page_count,
1361                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1362                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1363                         PTLRPC_BULK_PUT_SINK) |
1364                         PTLRPC_BULK_BUF_KIOV,
1365                 OST_BULK_PORTAL,
1366                 &ptlrpc_bulk_kiov_pin_ops);
1367
1368         if (desc == NULL)
1369                 GOTO(out, rc = -ENOMEM);
1370         /* NB request now owns desc and will free it when it gets freed */
1371 no_bulk:
1372         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1373         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1374         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1375         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1376
1377         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1378
1379         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1380          * and from_kgid(), because they are asynchronous. Fortunately, variable
1381          * oa contains valid o_uid and o_gid in these two operations.
1382          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1383          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1384          * other process logic */
1385         body->oa.o_uid = oa->o_uid;
1386         body->oa.o_gid = oa->o_gid;
1387
1388         obdo_to_ioobj(oa, ioobj);
1389         ioobj->ioo_bufcnt = niocount;
1390         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1391          * that might be send for this request.  The actual number is decided
1392          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1393          * "max - 1" for old client compatibility sending "0", and also so the
1394          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1395         if (desc != NULL)
1396                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1397         else /* short io */
1398                 ioobj_max_brw_set(ioobj, 0);
1399
1400         if (short_io_size != 0) {
1401                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1402                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1403                         body->oa.o_flags = 0;
1404                 }
1405                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1406                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1407                        short_io_size);
1408                 if (opc == OST_WRITE) {
1409                         short_io_buf = req_capsule_client_get(pill,
1410                                                               &RMF_SHORT_IO);
1411                         LASSERT(short_io_buf != NULL);
1412                 }
1413         }
1414
1415         LASSERT(page_count > 0);
1416         pg_prev = pga[0];
1417         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1418                 struct brw_page *pg = pga[i];
1419                 int poff = pg->off & ~PAGE_MASK;
1420
1421                 LASSERT(pg->count > 0);
1422                 /* make sure there is no gap in the middle of page array */
1423                 LASSERTF(page_count == 1 ||
1424                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1425                           ergo(i > 0 && i < page_count - 1,
1426                                poff == 0 && pg->count == PAGE_SIZE)   &&
1427                           ergo(i == page_count - 1, poff == 0)),
1428                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1429                          i, page_count, pg, pg->off, pg->count);
1430                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1431                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1432                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1433                          i, page_count,
1434                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1435                          pg_prev->pg, page_private(pg_prev->pg),
1436                          pg_prev->pg->index, pg_prev->off);
1437                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1438                         (pg->flag & OBD_BRW_SRVLOCK));
1439                 if (short_io_size != 0 && opc == OST_WRITE) {
1440                         unsigned char *ptr = kmap_atomic(pg->pg);
1441
1442                         LASSERT(short_io_size >= requested_nob + pg->count);
1443                         memcpy(short_io_buf + requested_nob,
1444                                ptr + poff,
1445                                pg->count);
1446                         kunmap_atomic(ptr);
1447                 } else if (short_io_size == 0) {
1448                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1449                                                          pg->count);
1450                 }
1451                 requested_nob += pg->count;
1452
1453                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1454                         niobuf--;
1455                         niobuf->rnb_len += pg->count;
1456                 } else {
1457                         niobuf->rnb_offset = pg->off;
1458                         niobuf->rnb_len    = pg->count;
1459                         niobuf->rnb_flags  = pg->flag;
1460                 }
1461                 pg_prev = pg;
1462         }
1463
1464         LASSERTF((void *)(niobuf - niocount) ==
1465                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1466                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1467                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1468
1469         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1470         if (resend) {
1471                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1472                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1473                         body->oa.o_flags = 0;
1474                 }
1475                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1476         }
1477
1478         if (osc_should_shrink_grant(cli))
1479                 osc_shrink_grant_local(cli, &body->oa);
1480
1481         /* size[REQ_REC_OFF] still sizeof (*body) */
1482         if (opc == OST_WRITE) {
1483                 if (cli->cl_checksum &&
1484                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1485                         /* store cl_cksum_type in a local variable since
1486                          * it can be changed via lprocfs */
1487                         enum cksum_types cksum_type = cli->cl_cksum_type;
1488
1489                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1490                                 body->oa.o_flags = 0;
1491
1492                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1493                                                                 cksum_type);
1494                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1495
1496                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1497                                                   requested_nob, page_count,
1498                                                   pga, OST_WRITE,
1499                                                   &body->oa.o_cksum);
1500                         if (rc < 0) {
1501                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1502                                        rc);
1503                                 GOTO(out, rc);
1504                         }
1505                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1506                                body->oa.o_cksum);
1507
1508                         /* save this in 'oa', too, for later checking */
1509                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1510                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1511                                                            cksum_type);
1512                 } else {
1513                         /* clear out the checksum flag, in case this is a
1514                          * resend but cl_checksum is no longer set. b=11238 */
1515                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1516                 }
1517                 oa->o_cksum = body->oa.o_cksum;
1518                 /* 1 RC per niobuf */
1519                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1520                                      sizeof(__u32) * niocount);
1521         } else {
1522                 if (cli->cl_checksum &&
1523                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1524                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1525                                 body->oa.o_flags = 0;
1526                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1527                                 cli->cl_cksum_type);
1528                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1529                 }
1530
1531                 /* Client cksum has been already copied to wire obdo in previous
1532                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1533                  * resent due to cksum error, this will allow Server to
1534                  * check+dump pages on its side */
1535         }
1536         ptlrpc_request_set_replen(req);
1537
1538         aa = ptlrpc_req_async_args(aa, req);
1539         aa->aa_oa = oa;
1540         aa->aa_requested_nob = requested_nob;
1541         aa->aa_nio_count = niocount;
1542         aa->aa_page_count = page_count;
1543         aa->aa_resends = 0;
1544         aa->aa_ppga = pga;
1545         aa->aa_cli = cli;
1546         INIT_LIST_HEAD(&aa->aa_oaps);
1547
1548         *reqp = req;
1549         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1550         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1551                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1552                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1553         RETURN(0);
1554
1555  out:
1556         ptlrpc_req_finished(req);
1557         RETURN(rc);
1558 }
1559
1560 char dbgcksum_file_name[PATH_MAX];
1561
1562 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1563                                 struct brw_page **pga, __u32 server_cksum,
1564                                 __u32 client_cksum)
1565 {
1566         struct file *filp;
1567         int rc, i;
1568         unsigned int len;
1569         char *buf;
1570
1571         /* will only keep dump of pages on first error for the same range in
1572          * file/fid, not during the resends/retries. */
1573         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1574                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1575                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1576                   libcfs_debug_file_path_arr :
1577                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1578                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1579                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1580                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1581                  pga[0]->off,
1582                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1583                  client_cksum, server_cksum);
1584         filp = filp_open(dbgcksum_file_name,
1585                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1586         if (IS_ERR(filp)) {
1587                 rc = PTR_ERR(filp);
1588                 if (rc == -EEXIST)
1589                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1590                                "checksum error: rc = %d\n", dbgcksum_file_name,
1591                                rc);
1592                 else
1593                         CERROR("%s: can't open to dump pages with checksum "
1594                                "error: rc = %d\n", dbgcksum_file_name, rc);
1595                 return;
1596         }
1597
1598         for (i = 0; i < page_count; i++) {
1599                 len = pga[i]->count;
1600                 buf = kmap(pga[i]->pg);
1601                 while (len != 0) {
1602                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1603                         if (rc < 0) {
1604                                 CERROR("%s: wanted to write %u but got %d "
1605                                        "error\n", dbgcksum_file_name, len, rc);
1606                                 break;
1607                         }
1608                         len -= rc;
1609                         buf += rc;
1610                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1611                                dbgcksum_file_name, rc);
1612                 }
1613                 kunmap(pga[i]->pg);
1614         }
1615
1616         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1617         if (rc)
1618                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1619         filp_close(filp, NULL);
1620 }
1621
1622 static int
1623 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1624                      __u32 client_cksum, __u32 server_cksum,
1625                      struct osc_brw_async_args *aa)
1626 {
1627         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1628         enum cksum_types cksum_type;
1629         obd_dif_csum_fn *fn = NULL;
1630         int sector_size = 0;
1631         __u32 new_cksum;
1632         char *msg;
1633         int rc;
1634
1635         if (server_cksum == client_cksum) {
1636                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1637                 return 0;
1638         }
1639
1640         if (aa->aa_cli->cl_checksum_dump)
1641                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1642                                     server_cksum, client_cksum);
1643
1644         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1645                                            oa->o_flags : 0);
1646
1647         switch (cksum_type) {
1648         case OBD_CKSUM_T10IP512:
1649                 fn = obd_dif_ip_fn;
1650                 sector_size = 512;
1651                 break;
1652         case OBD_CKSUM_T10IP4K:
1653                 fn = obd_dif_ip_fn;
1654                 sector_size = 4096;
1655                 break;
1656         case OBD_CKSUM_T10CRC512:
1657                 fn = obd_dif_crc_fn;
1658                 sector_size = 512;
1659                 break;
1660         case OBD_CKSUM_T10CRC4K:
1661                 fn = obd_dif_crc_fn;
1662                 sector_size = 4096;
1663                 break;
1664         default:
1665                 break;
1666         }
1667
1668         if (fn)
1669                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1670                                              aa->aa_page_count, aa->aa_ppga,
1671                                              OST_WRITE, fn, sector_size,
1672                                              &new_cksum);
1673         else
1674                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1675                                        aa->aa_ppga, OST_WRITE, cksum_type,
1676                                        &new_cksum);
1677
1678         if (rc < 0)
1679                 msg = "failed to calculate the client write checksum";
1680         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1681                 msg = "the server did not use the checksum type specified in "
1682                       "the original request - likely a protocol problem";
1683         else if (new_cksum == server_cksum)
1684                 msg = "changed on the client after we checksummed it - "
1685                       "likely false positive due to mmap IO (bug 11742)";
1686         else if (new_cksum == client_cksum)
1687                 msg = "changed in transit before arrival at OST";
1688         else
1689                 msg = "changed in transit AND doesn't match the original - "
1690                       "likely false positive due to mmap IO (bug 11742)";
1691
1692         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1693                            DFID " object "DOSTID" extent [%llu-%llu], original "
1694                            "client csum %x (type %x), server csum %x (type %x),"
1695                            " client csum now %x\n",
1696                            obd_name, msg, libcfs_nid2str(peer->nid),
1697                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1698                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1699                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1700                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1701                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1702                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1703                            client_cksum,
1704                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1705                            server_cksum, cksum_type, new_cksum);
1706         return 1;
1707 }
1708
1709 /* Note rc enters this function as number of bytes transferred */
1710 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1711 {
1712         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1713         struct client_obd *cli = aa->aa_cli;
1714         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1715         const struct lnet_process_id *peer =
1716                 &req->rq_import->imp_connection->c_peer;
1717         struct ost_body *body;
1718         u32 client_cksum = 0;
1719
1720         ENTRY;
1721
1722         if (rc < 0 && rc != -EDQUOT) {
1723                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1724                 RETURN(rc);
1725         }
1726
1727         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1728         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1729         if (body == NULL) {
1730                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1731                 RETURN(-EPROTO);
1732         }
1733
1734         /* set/clear over quota flag for a uid/gid/projid */
1735         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1736             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1737                 unsigned qid[LL_MAXQUOTAS] = {
1738                                          body->oa.o_uid, body->oa.o_gid,
1739                                          body->oa.o_projid };
1740                 CDEBUG(D_QUOTA,
1741                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1742                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1743                        body->oa.o_valid, body->oa.o_flags);
1744                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1745                                        body->oa.o_flags);
1746         }
1747
1748         osc_update_grant(cli, body);
1749
1750         if (rc < 0)
1751                 RETURN(rc);
1752
1753         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1754                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1755
1756         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1757                 if (rc > 0) {
1758                         CERROR("%s: unexpected positive size %d\n",
1759                                obd_name, rc);
1760                         RETURN(-EPROTO);
1761                 }
1762
1763                 if (req->rq_bulk != NULL &&
1764                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1765                         RETURN(-EAGAIN);
1766
1767                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1768                     check_write_checksum(&body->oa, peer, client_cksum,
1769                                          body->oa.o_cksum, aa))
1770                         RETURN(-EAGAIN);
1771
1772                 rc = check_write_rcs(req, aa->aa_requested_nob,
1773                                      aa->aa_nio_count, aa->aa_page_count,
1774                                      aa->aa_ppga);
1775                 GOTO(out, rc);
1776         }
1777
1778         /* The rest of this function executes only for OST_READs */
1779
1780         if (req->rq_bulk == NULL) {
1781                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1782                                           RCL_SERVER);
1783                 LASSERT(rc == req->rq_status);
1784         } else {
1785                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1786                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1787         }
1788         if (rc < 0)
1789                 GOTO(out, rc = -EAGAIN);
1790
1791         if (rc > aa->aa_requested_nob) {
1792                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1793                        rc, aa->aa_requested_nob);
1794                 RETURN(-EPROTO);
1795         }
1796
1797         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1798                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1799                        rc, req->rq_bulk->bd_nob_transferred);
1800                 RETURN(-EPROTO);
1801         }
1802
1803         if (req->rq_bulk == NULL) {
1804                 /* short io */
1805                 int nob, pg_count, i = 0;
1806                 unsigned char *buf;
1807
1808                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1809                 pg_count = aa->aa_page_count;
1810                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1811                                                    rc);
1812                 nob = rc;
1813                 while (nob > 0 && pg_count > 0) {
1814                         unsigned char *ptr;
1815                         int count = aa->aa_ppga[i]->count > nob ?
1816                                     nob : aa->aa_ppga[i]->count;
1817
1818                         CDEBUG(D_CACHE, "page %p count %d\n",
1819                                aa->aa_ppga[i]->pg, count);
1820                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1821                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1822                                count);
1823                         kunmap_atomic((void *) ptr);
1824
1825                         buf += count;
1826                         nob -= count;
1827                         i++;
1828                         pg_count--;
1829                 }
1830         }
1831
1832         if (rc < aa->aa_requested_nob)
1833                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1834
1835         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1836                 static int cksum_counter;
1837                 u32        server_cksum = body->oa.o_cksum;
1838                 char      *via = "";
1839                 char      *router = "";
1840                 enum cksum_types cksum_type;
1841                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1842                         body->oa.o_flags : 0;
1843
1844                 cksum_type = obd_cksum_type_unpack(o_flags);
1845                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1846                                           aa->aa_page_count, aa->aa_ppga,
1847                                           OST_READ, &client_cksum);
1848                 if (rc < 0)
1849                         GOTO(out, rc);
1850
1851                 if (req->rq_bulk != NULL &&
1852                     peer->nid != req->rq_bulk->bd_sender) {
1853                         via = " via ";
1854                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1855                 }
1856
1857                 if (server_cksum != client_cksum) {
1858                         struct ost_body *clbody;
1859                         u32 page_count = aa->aa_page_count;
1860
1861                         clbody = req_capsule_client_get(&req->rq_pill,
1862                                                         &RMF_OST_BODY);
1863                         if (cli->cl_checksum_dump)
1864                                 dump_all_bulk_pages(&clbody->oa, page_count,
1865                                                     aa->aa_ppga, server_cksum,
1866                                                     client_cksum);
1867
1868                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1869                                            "%s%s%s inode "DFID" object "DOSTID
1870                                            " extent [%llu-%llu], client %x, "
1871                                            "server %x, cksum_type %x\n",
1872                                            obd_name,
1873                                            libcfs_nid2str(peer->nid),
1874                                            via, router,
1875                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1876                                                 clbody->oa.o_parent_seq : 0ULL,
1877                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1878                                                 clbody->oa.o_parent_oid : 0,
1879                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1880                                                 clbody->oa.o_parent_ver : 0,
1881                                            POSTID(&body->oa.o_oi),
1882                                            aa->aa_ppga[0]->off,
1883                                            aa->aa_ppga[page_count-1]->off +
1884                                            aa->aa_ppga[page_count-1]->count - 1,
1885                                            client_cksum, server_cksum,
1886                                            cksum_type);
1887                         cksum_counter = 0;
1888                         aa->aa_oa->o_cksum = client_cksum;
1889                         rc = -EAGAIN;
1890                 } else {
1891                         cksum_counter++;
1892                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1893                         rc = 0;
1894                 }
1895         } else if (unlikely(client_cksum)) {
1896                 static int cksum_missed;
1897
1898                 cksum_missed++;
1899                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1900                         CERROR("%s: checksum %u requested from %s but not sent\n",
1901                                obd_name, cksum_missed,
1902                                libcfs_nid2str(peer->nid));
1903         } else {
1904                 rc = 0;
1905         }
1906 out:
1907         if (rc >= 0)
1908                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1909                                      aa->aa_oa, &body->oa);
1910
1911         RETURN(rc);
1912 }
1913
1914 static int osc_brw_redo_request(struct ptlrpc_request *request,
1915                                 struct osc_brw_async_args *aa, int rc)
1916 {
1917         struct ptlrpc_request *new_req;
1918         struct osc_brw_async_args *new_aa;
1919         struct osc_async_page *oap;
1920         ENTRY;
1921
1922         /* The below message is checked in replay-ost-single.sh test_8ae*/
1923         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1924                   "redo for recoverable error %d", rc);
1925
1926         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1927                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1928                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1929                                   aa->aa_ppga, &new_req, 1);
1930         if (rc)
1931                 RETURN(rc);
1932
1933         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1934                 if (oap->oap_request != NULL) {
1935                         LASSERTF(request == oap->oap_request,
1936                                  "request %p != oap_request %p\n",
1937                                  request, oap->oap_request);
1938                 }
1939         }
1940         /*
1941          * New request takes over pga and oaps from old request.
1942          * Note that copying a list_head doesn't work, need to move it...
1943          */
1944         aa->aa_resends++;
1945         new_req->rq_interpret_reply = request->rq_interpret_reply;
1946         new_req->rq_async_args = request->rq_async_args;
1947         new_req->rq_commit_cb = request->rq_commit_cb;
1948         /* cap resend delay to the current request timeout, this is similar to
1949          * what ptlrpc does (see after_reply()) */
1950         if (aa->aa_resends > new_req->rq_timeout)
1951                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1952         else
1953                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1954         new_req->rq_generation_set = 1;
1955         new_req->rq_import_generation = request->rq_import_generation;
1956
1957         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1958
1959         INIT_LIST_HEAD(&new_aa->aa_oaps);
1960         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1961         INIT_LIST_HEAD(&new_aa->aa_exts);
1962         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1963         new_aa->aa_resends = aa->aa_resends;
1964
1965         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1966                 if (oap->oap_request) {
1967                         ptlrpc_req_finished(oap->oap_request);
1968                         oap->oap_request = ptlrpc_request_addref(new_req);
1969                 }
1970         }
1971
1972         /* XXX: This code will run into problem if we're going to support
1973          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1974          * and wait for all of them to be finished. We should inherit request
1975          * set from old request. */
1976         ptlrpcd_add_req(new_req);
1977
1978         DEBUG_REQ(D_INFO, new_req, "new request");
1979         RETURN(0);
1980 }
1981
1982 /*
1983  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1984  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1985  * fine for our small page arrays and doesn't require allocation.  its an
1986  * insertion sort that swaps elements that are strides apart, shrinking the
1987  * stride down until its '1' and the array is sorted.
1988  */
1989 static void sort_brw_pages(struct brw_page **array, int num)
1990 {
1991         int stride, i, j;
1992         struct brw_page *tmp;
1993
1994         if (num == 1)
1995                 return;
1996         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1997                 ;
1998
1999         do {
2000                 stride /= 3;
2001                 for (i = stride ; i < num ; i++) {
2002                         tmp = array[i];
2003                         j = i;
2004                         while (j >= stride && array[j - stride]->off > tmp->off) {
2005                                 array[j] = array[j - stride];
2006                                 j -= stride;
2007                         }
2008                         array[j] = tmp;
2009                 }
2010         } while (stride > 1);
2011 }
2012
2013 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2014 {
2015         LASSERT(ppga != NULL);
2016         OBD_FREE(ppga, sizeof(*ppga) * count);
2017 }
2018
2019 static int brw_interpret(const struct lu_env *env,
2020                          struct ptlrpc_request *req, void *args, int rc)
2021 {
2022         struct osc_brw_async_args *aa = args;
2023         struct osc_extent *ext;
2024         struct osc_extent *tmp;
2025         struct client_obd *cli = aa->aa_cli;
2026         unsigned long transferred = 0;
2027
2028         ENTRY;
2029
2030         rc = osc_brw_fini_request(req, rc);
2031         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2032         /*
2033          * When server returns -EINPROGRESS, client should always retry
2034          * regardless of the number of times the bulk was resent already.
2035          */
2036         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2037                 if (req->rq_import_generation !=
2038                     req->rq_import->imp_generation) {
2039                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2040                                ""DOSTID", rc = %d.\n",
2041                                req->rq_import->imp_obd->obd_name,
2042                                POSTID(&aa->aa_oa->o_oi), rc);
2043                 } else if (rc == -EINPROGRESS ||
2044                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2045                         rc = osc_brw_redo_request(req, aa, rc);
2046                 } else {
2047                         CERROR("%s: too many resent retries for object: "
2048                                "%llu:%llu, rc = %d.\n",
2049                                req->rq_import->imp_obd->obd_name,
2050                                POSTID(&aa->aa_oa->o_oi), rc);
2051                 }
2052
2053                 if (rc == 0)
2054                         RETURN(0);
2055                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2056                         rc = -EIO;
2057         }
2058
2059         if (rc == 0) {
2060                 struct obdo *oa = aa->aa_oa;
2061                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2062                 unsigned long valid = 0;
2063                 struct cl_object *obj;
2064                 struct osc_async_page *last;
2065
2066                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2067                 obj = osc2cl(last->oap_obj);
2068
2069                 cl_object_attr_lock(obj);
2070                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2071                         attr->cat_blocks = oa->o_blocks;
2072                         valid |= CAT_BLOCKS;
2073                 }
2074                 if (oa->o_valid & OBD_MD_FLMTIME) {
2075                         attr->cat_mtime = oa->o_mtime;
2076                         valid |= CAT_MTIME;
2077                 }
2078                 if (oa->o_valid & OBD_MD_FLATIME) {
2079                         attr->cat_atime = oa->o_atime;
2080                         valid |= CAT_ATIME;
2081                 }
2082                 if (oa->o_valid & OBD_MD_FLCTIME) {
2083                         attr->cat_ctime = oa->o_ctime;
2084                         valid |= CAT_CTIME;
2085                 }
2086
2087                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2088                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2089                         loff_t last_off = last->oap_count + last->oap_obj_off +
2090                                 last->oap_page_off;
2091
2092                         /* Change file size if this is an out of quota or
2093                          * direct IO write and it extends the file size */
2094                         if (loi->loi_lvb.lvb_size < last_off) {
2095                                 attr->cat_size = last_off;
2096                                 valid |= CAT_SIZE;
2097                         }
2098                         /* Extend KMS if it's not a lockless write */
2099                         if (loi->loi_kms < last_off &&
2100                             oap2osc_page(last)->ops_srvlock == 0) {
2101                                 attr->cat_kms = last_off;
2102                                 valid |= CAT_KMS;
2103                         }
2104                 }
2105
2106                 if (valid != 0)
2107                         cl_object_attr_update(env, obj, attr, valid);
2108                 cl_object_attr_unlock(obj);
2109         }
2110         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2111         aa->aa_oa = NULL;
2112
2113         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2114                 osc_inc_unstable_pages(req);
2115
2116         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2117                 list_del_init(&ext->oe_link);
2118                 osc_extent_finish(env, ext, 1,
2119                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2120         }
2121         LASSERT(list_empty(&aa->aa_exts));
2122         LASSERT(list_empty(&aa->aa_oaps));
2123
2124         transferred = (req->rq_bulk == NULL ? /* short io */
2125                        aa->aa_requested_nob :
2126                        req->rq_bulk->bd_nob_transferred);
2127
2128         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2129         ptlrpc_lprocfs_brw(req, transferred);
2130
2131         spin_lock(&cli->cl_loi_list_lock);
2132         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2133          * is called so we know whether to go to sync BRWs or wait for more
2134          * RPCs to complete */
2135         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2136                 cli->cl_w_in_flight--;
2137         else
2138                 cli->cl_r_in_flight--;
2139         osc_wake_cache_waiters(cli);
2140         spin_unlock(&cli->cl_loi_list_lock);
2141
2142         osc_io_unplug(env, cli, NULL);
2143         RETURN(rc);
2144 }
2145
2146 static void brw_commit(struct ptlrpc_request *req)
2147 {
2148         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2149          * this called via the rq_commit_cb, I need to ensure
2150          * osc_dec_unstable_pages is still called. Otherwise unstable
2151          * pages may be leaked. */
2152         spin_lock(&req->rq_lock);
2153         if (likely(req->rq_unstable)) {
2154                 req->rq_unstable = 0;
2155                 spin_unlock(&req->rq_lock);
2156
2157                 osc_dec_unstable_pages(req);
2158         } else {
2159                 req->rq_committed = 1;
2160                 spin_unlock(&req->rq_lock);
2161         }
2162 }
2163
2164 /**
2165  * Build an RPC by the list of extent @ext_list. The caller must ensure
2166  * that the total pages in this list are NOT over max pages per RPC.
2167  * Extents in the list must be in OES_RPC state.
2168  */
2169 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2170                   struct list_head *ext_list, int cmd)
2171 {
2172         struct ptlrpc_request           *req = NULL;
2173         struct osc_extent               *ext;
2174         struct brw_page                 **pga = NULL;
2175         struct osc_brw_async_args       *aa = NULL;
2176         struct obdo                     *oa = NULL;
2177         struct osc_async_page           *oap;
2178         struct osc_object               *obj = NULL;
2179         struct cl_req_attr              *crattr = NULL;
2180         loff_t                          starting_offset = OBD_OBJECT_EOF;
2181         loff_t                          ending_offset = 0;
2182         int                             mpflag = 0;
2183         int                             mem_tight = 0;
2184         int                             page_count = 0;
2185         bool                            soft_sync = false;
2186         bool                            ndelay = false;
2187         int                             i;
2188         int                             grant = 0;
2189         int                             rc;
2190         __u32                           layout_version = 0;
2191         LIST_HEAD(rpc_list);
2192         struct ost_body                 *body;
2193         ENTRY;
2194         LASSERT(!list_empty(ext_list));
2195
2196         /* add pages into rpc_list to build BRW rpc */
2197         list_for_each_entry(ext, ext_list, oe_link) {
2198                 LASSERT(ext->oe_state == OES_RPC);
2199                 mem_tight |= ext->oe_memalloc;
2200                 grant += ext->oe_grants;
2201                 page_count += ext->oe_nr_pages;
2202                 layout_version = max(layout_version, ext->oe_layout_version);
2203                 if (obj == NULL)
2204                         obj = ext->oe_obj;
2205         }
2206
2207         soft_sync = osc_over_unstable_soft_limit(cli);
2208         if (mem_tight)
2209                 mpflag = cfs_memory_pressure_get_and_set();
2210
2211         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2212         if (pga == NULL)
2213                 GOTO(out, rc = -ENOMEM);
2214
2215         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2216         if (oa == NULL)
2217                 GOTO(out, rc = -ENOMEM);
2218
2219         i = 0;
2220         list_for_each_entry(ext, ext_list, oe_link) {
2221                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2222                         if (mem_tight)
2223                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2224                         if (soft_sync)
2225                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2226                         pga[i] = &oap->oap_brw_page;
2227                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2228                         i++;
2229
2230                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2231                         if (starting_offset == OBD_OBJECT_EOF ||
2232                             starting_offset > oap->oap_obj_off)
2233                                 starting_offset = oap->oap_obj_off;
2234                         else
2235                                 LASSERT(oap->oap_page_off == 0);
2236                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2237                                 ending_offset = oap->oap_obj_off +
2238                                                 oap->oap_count;
2239                         else
2240                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2241                                         PAGE_SIZE);
2242                 }
2243                 if (ext->oe_ndelay)
2244                         ndelay = true;
2245         }
2246
2247         /* first page in the list */
2248         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2249
2250         crattr = &osc_env_info(env)->oti_req_attr;
2251         memset(crattr, 0, sizeof(*crattr));
2252         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2253         crattr->cra_flags = ~0ULL;
2254         crattr->cra_page = oap2cl_page(oap);
2255         crattr->cra_oa = oa;
2256         cl_req_attr_set(env, osc2cl(obj), crattr);
2257
2258         if (cmd == OBD_BRW_WRITE) {
2259                 oa->o_grant_used = grant;
2260                 if (layout_version > 0) {
2261                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2262                                PFID(&oa->o_oi.oi_fid), layout_version);
2263
2264                         oa->o_layout_version = layout_version;
2265                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2266                 }
2267         }
2268
2269         sort_brw_pages(pga, page_count);
2270         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2271         if (rc != 0) {
2272                 CERROR("prep_req failed: %d\n", rc);
2273                 GOTO(out, rc);
2274         }
2275
2276         req->rq_commit_cb = brw_commit;
2277         req->rq_interpret_reply = brw_interpret;
2278         req->rq_memalloc = mem_tight != 0;
2279         oap->oap_request = ptlrpc_request_addref(req);
2280         if (ndelay) {
2281                 req->rq_no_resend = req->rq_no_delay = 1;
2282                 /* probably set a shorter timeout value.
2283                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2284                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2285         }
2286
2287         /* Need to update the timestamps after the request is built in case
2288          * we race with setattr (locally or in queue at OST).  If OST gets
2289          * later setattr before earlier BRW (as determined by the request xid),
2290          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2291          * way to do this in a single call.  bug 10150 */
2292         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2293         crattr->cra_oa = &body->oa;
2294         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2295         cl_req_attr_set(env, osc2cl(obj), crattr);
2296         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2297
2298         aa = ptlrpc_req_async_args(aa, req);
2299         INIT_LIST_HEAD(&aa->aa_oaps);
2300         list_splice_init(&rpc_list, &aa->aa_oaps);
2301         INIT_LIST_HEAD(&aa->aa_exts);
2302         list_splice_init(ext_list, &aa->aa_exts);
2303
2304         spin_lock(&cli->cl_loi_list_lock);
2305         starting_offset >>= PAGE_SHIFT;
2306         if (cmd == OBD_BRW_READ) {
2307                 cli->cl_r_in_flight++;
2308                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2309                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2310                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2311                                       starting_offset + 1);
2312         } else {
2313                 cli->cl_w_in_flight++;
2314                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2315                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2316                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2317                                       starting_offset + 1);
2318         }
2319         spin_unlock(&cli->cl_loi_list_lock);
2320
2321         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2322                   page_count, aa, cli->cl_r_in_flight,
2323                   cli->cl_w_in_flight);
2324         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2325
2326         ptlrpcd_add_req(req);
2327         rc = 0;
2328         EXIT;
2329
2330 out:
2331         if (mem_tight != 0)
2332                 cfs_memory_pressure_restore(mpflag);
2333
2334         if (rc != 0) {
2335                 LASSERT(req == NULL);
2336
2337                 if (oa)
2338                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2339                 if (pga)
2340                         OBD_FREE(pga, sizeof(*pga) * page_count);
2341                 /* this should happen rarely and is pretty bad, it makes the
2342                  * pending list not follow the dirty order */
2343                 while (!list_empty(ext_list)) {
2344                         ext = list_entry(ext_list->next, struct osc_extent,
2345                                          oe_link);
2346                         list_del_init(&ext->oe_link);
2347                         osc_extent_finish(env, ext, 0, rc);
2348                 }
2349         }
2350         RETURN(rc);
2351 }
2352
2353 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2354 {
2355         int set = 0;
2356
2357         LASSERT(lock != NULL);
2358
2359         lock_res_and_lock(lock);
2360
2361         if (lock->l_ast_data == NULL)
2362                 lock->l_ast_data = data;
2363         if (lock->l_ast_data == data)
2364                 set = 1;
2365
2366         unlock_res_and_lock(lock);
2367
2368         return set;
2369 }
2370
2371 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2372                      void *cookie, struct lustre_handle *lockh,
2373                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2374                      int errcode)
2375 {
2376         bool intent = *flags & LDLM_FL_HAS_INTENT;
2377         int rc;
2378         ENTRY;
2379
2380         /* The request was created before ldlm_cli_enqueue call. */
2381         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2382                 struct ldlm_reply *rep;
2383
2384                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2385                 LASSERT(rep != NULL);
2386
2387                 rep->lock_policy_res1 =
2388                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2389                 if (rep->lock_policy_res1)
2390                         errcode = rep->lock_policy_res1;
2391                 if (!speculative)
2392                         *flags |= LDLM_FL_LVB_READY;
2393         } else if (errcode == ELDLM_OK) {
2394                 *flags |= LDLM_FL_LVB_READY;
2395         }
2396
2397         /* Call the update callback. */
2398         rc = (*upcall)(cookie, lockh, errcode);
2399
2400         /* release the reference taken in ldlm_cli_enqueue() */
2401         if (errcode == ELDLM_LOCK_MATCHED)
2402                 errcode = ELDLM_OK;
2403         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2404                 ldlm_lock_decref(lockh, mode);
2405
2406         RETURN(rc);
2407 }
2408
2409 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2410                           void *args, int rc)
2411 {
2412         struct osc_enqueue_args *aa = args;
2413         struct ldlm_lock *lock;
2414         struct lustre_handle *lockh = &aa->oa_lockh;
2415         enum ldlm_mode mode = aa->oa_mode;
2416         struct ost_lvb *lvb = aa->oa_lvb;
2417         __u32 lvb_len = sizeof(*lvb);
2418         __u64 flags = 0;
2419
2420         ENTRY;
2421
2422         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2423          * be valid. */
2424         lock = ldlm_handle2lock(lockh);
2425         LASSERTF(lock != NULL,
2426                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2427                  lockh->cookie, req, aa);
2428
2429         /* Take an additional reference so that a blocking AST that
2430          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2431          * to arrive after an upcall has been executed by
2432          * osc_enqueue_fini(). */
2433         ldlm_lock_addref(lockh, mode);
2434
2435         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2436         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2437
2438         /* Let CP AST to grant the lock first. */
2439         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2440
2441         if (aa->oa_speculative) {
2442                 LASSERT(aa->oa_lvb == NULL);
2443                 LASSERT(aa->oa_flags == NULL);
2444                 aa->oa_flags = &flags;
2445         }
2446
2447         /* Complete obtaining the lock procedure. */
2448         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2449                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2450                                    lockh, rc);
2451         /* Complete osc stuff. */
2452         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2453                               aa->oa_flags, aa->oa_speculative, rc);
2454
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2456
2457         ldlm_lock_decref(lockh, mode);
2458         LDLM_LOCK_PUT(lock);
2459         RETURN(rc);
2460 }
2461
2462 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2463  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2464  * other synchronous requests, however keeping some locks and trying to obtain
2465  * others may take a considerable amount of time in a case of ost failure; and
2466  * when other sync requests do not get released lock from a client, the client
2467  * is evicted from the cluster -- such scenarious make the life difficult, so
2468  * release locks just after they are obtained. */
2469 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2470                      __u64 *flags, union ldlm_policy_data *policy,
2471                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2472                      void *cookie, struct ldlm_enqueue_info *einfo,
2473                      struct ptlrpc_request_set *rqset, int async,
2474                      bool speculative)
2475 {
2476         struct obd_device *obd = exp->exp_obd;
2477         struct lustre_handle lockh = { 0 };
2478         struct ptlrpc_request *req = NULL;
2479         int intent = *flags & LDLM_FL_HAS_INTENT;
2480         __u64 match_flags = *flags;
2481         enum ldlm_mode mode;
2482         int rc;
2483         ENTRY;
2484
2485         /* Filesystem lock extents are extended to page boundaries so that
2486          * dealing with the page cache is a little smoother.  */
2487         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2488         policy->l_extent.end |= ~PAGE_MASK;
2489
2490         /* Next, search for already existing extent locks that will cover us */
2491         /* If we're trying to read, we also search for an existing PW lock.  The
2492          * VFS and page cache already protect us locally, so lots of readers/
2493          * writers can share a single PW lock.
2494          *
2495          * There are problems with conversion deadlocks, so instead of
2496          * converting a read lock to a write lock, we'll just enqueue a new
2497          * one.
2498          *
2499          * At some point we should cancel the read lock instead of making them
2500          * send us a blocking callback, but there are problems with canceling
2501          * locks out from other users right now, too. */
2502         mode = einfo->ei_mode;
2503         if (einfo->ei_mode == LCK_PR)
2504                 mode |= LCK_PW;
2505         /* Normal lock requests must wait for the LVB to be ready before
2506          * matching a lock; speculative lock requests do not need to,
2507          * because they will not actually use the lock. */
2508         if (!speculative)
2509                 match_flags |= LDLM_FL_LVB_READY;
2510         if (intent != 0)
2511                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2512         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2513                                einfo->ei_type, policy, mode, &lockh, 0);
2514         if (mode) {
2515                 struct ldlm_lock *matched;
2516
2517                 if (*flags & LDLM_FL_TEST_LOCK)
2518                         RETURN(ELDLM_OK);
2519
2520                 matched = ldlm_handle2lock(&lockh);
2521                 if (speculative) {
2522                         /* This DLM lock request is speculative, and does not
2523                          * have an associated IO request. Therefore if there
2524                          * is already a DLM lock, it wll just inform the
2525                          * caller to cancel the request for this stripe.*/
2526                         lock_res_and_lock(matched);
2527                         if (ldlm_extent_equal(&policy->l_extent,
2528                             &matched->l_policy_data.l_extent))
2529                                 rc = -EEXIST;
2530                         else
2531                                 rc = -ECANCELED;
2532                         unlock_res_and_lock(matched);
2533
2534                         ldlm_lock_decref(&lockh, mode);
2535                         LDLM_LOCK_PUT(matched);
2536                         RETURN(rc);
2537                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2538                         *flags |= LDLM_FL_LVB_READY;
2539
2540                         /* We already have a lock, and it's referenced. */
2541                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2542
2543                         ldlm_lock_decref(&lockh, mode);
2544                         LDLM_LOCK_PUT(matched);
2545                         RETURN(ELDLM_OK);
2546                 } else {
2547                         ldlm_lock_decref(&lockh, mode);
2548                         LDLM_LOCK_PUT(matched);
2549                 }
2550         }
2551
2552         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2553                 RETURN(-ENOLCK);
2554
2555         if (intent) {
2556                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2557                                            &RQF_LDLM_ENQUEUE_LVB);
2558                 if (req == NULL)
2559                         RETURN(-ENOMEM);
2560
2561                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2562                 if (rc) {
2563                         ptlrpc_request_free(req);
2564                         RETURN(rc);
2565                 }
2566
2567                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2568                                      sizeof *lvb);
2569                 ptlrpc_request_set_replen(req);
2570         }
2571
2572         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2573         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2574
2575         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2576                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2577         if (async) {
2578                 if (!rc) {
2579                         struct osc_enqueue_args *aa;
2580                         aa = ptlrpc_req_async_args(aa, req);
2581                         aa->oa_exp         = exp;
2582                         aa->oa_mode        = einfo->ei_mode;
2583                         aa->oa_type        = einfo->ei_type;
2584                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2585                         aa->oa_upcall      = upcall;
2586                         aa->oa_cookie      = cookie;
2587                         aa->oa_speculative = speculative;
2588                         if (!speculative) {
2589                                 aa->oa_flags  = flags;
2590                                 aa->oa_lvb    = lvb;
2591                         } else {
2592                                 /* speculative locks are essentially to enqueue
2593                                  * a DLM lock  in advance, so we don't care
2594                                  * about the result of the enqueue. */
2595                                 aa->oa_lvb    = NULL;
2596                                 aa->oa_flags  = NULL;
2597                         }
2598
2599                         req->rq_interpret_reply = osc_enqueue_interpret;
2600                         ptlrpc_set_add_req(rqset, req);
2601                 } else if (intent) {
2602                         ptlrpc_req_finished(req);
2603                 }
2604                 RETURN(rc);
2605         }
2606
2607         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2608                               flags, speculative, rc);
2609         if (intent)
2610                 ptlrpc_req_finished(req);
2611
2612         RETURN(rc);
2613 }
2614
2615 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2616                    struct ldlm_res_id *res_id, enum ldlm_type type,
2617                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2618                    __u64 *flags, struct osc_object *obj,
2619                    struct lustre_handle *lockh, int unref)
2620 {
2621         struct obd_device *obd = exp->exp_obd;
2622         __u64 lflags = *flags;
2623         enum ldlm_mode rc;
2624         ENTRY;
2625
2626         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2627                 RETURN(-EIO);
2628
2629         /* Filesystem lock extents are extended to page boundaries so that
2630          * dealing with the page cache is a little smoother */
2631         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2632         policy->l_extent.end |= ~PAGE_MASK;
2633
2634         /* Next, search for already existing extent locks that will cover us */
2635         /* If we're trying to read, we also search for an existing PW lock.  The
2636          * VFS and page cache already protect us locally, so lots of readers/
2637          * writers can share a single PW lock. */
2638         rc = mode;
2639         if (mode == LCK_PR)
2640                 rc |= LCK_PW;
2641         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2642                              res_id, type, policy, rc, lockh, unref);
2643         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2644                 RETURN(rc);
2645
2646         if (obj != NULL) {
2647                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2648
2649                 LASSERT(lock != NULL);
2650                 if (osc_set_lock_data(lock, obj)) {
2651                         lock_res_and_lock(lock);
2652                         if (!ldlm_is_lvb_cached(lock)) {
2653                                 LASSERT(lock->l_ast_data == obj);
2654                                 osc_lock_lvb_update(env, obj, lock, NULL);
2655                                 ldlm_set_lvb_cached(lock);
2656                         }
2657                         unlock_res_and_lock(lock);
2658                 } else {
2659                         ldlm_lock_decref(lockh, rc);
2660                         rc = 0;
2661                 }
2662                 LDLM_LOCK_PUT(lock);
2663         }
2664         RETURN(rc);
2665 }
2666
2667 static int osc_statfs_interpret(const struct lu_env *env,
2668                                 struct ptlrpc_request *req, void *args, int rc)
2669 {
2670         struct osc_async_args *aa = args;
2671         struct obd_statfs *msfs;
2672
2673         ENTRY;
2674         if (rc == -EBADR)
2675                 /*
2676                  * The request has in fact never been sent due to issues at
2677                  * a higher level (LOV).  Exit immediately since the caller
2678                  * is aware of the problem and takes care of the clean up.
2679                  */
2680                 RETURN(rc);
2681
2682         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2683             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2684                 GOTO(out, rc = 0);
2685
2686         if (rc != 0)
2687                 GOTO(out, rc);
2688
2689         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2690         if (msfs == NULL)
2691                 GOTO(out, rc = -EPROTO);
2692
2693         *aa->aa_oi->oi_osfs = *msfs;
2694 out:
2695         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2696
2697         RETURN(rc);
2698 }
2699
2700 static int osc_statfs_async(struct obd_export *exp,
2701                             struct obd_info *oinfo, time64_t max_age,
2702                             struct ptlrpc_request_set *rqset)
2703 {
2704         struct obd_device     *obd = class_exp2obd(exp);
2705         struct ptlrpc_request *req;
2706         struct osc_async_args *aa;
2707         int rc;
2708         ENTRY;
2709
2710         if (obd->obd_osfs_age >= max_age) {
2711                 CDEBUG(D_SUPER,
2712                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2713                        obd->obd_name, &obd->obd_osfs,
2714                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2715                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2716                 spin_lock(&obd->obd_osfs_lock);
2717                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2718                 spin_unlock(&obd->obd_osfs_lock);
2719                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2720                 if (oinfo->oi_cb_up)
2721                         oinfo->oi_cb_up(oinfo, 0);
2722
2723                 RETURN(0);
2724         }
2725
2726         /* We could possibly pass max_age in the request (as an absolute
2727          * timestamp or a "seconds.usec ago") so the target can avoid doing
2728          * extra calls into the filesystem if that isn't necessary (e.g.
2729          * during mount that would help a bit).  Having relative timestamps
2730          * is not so great if request processing is slow, while absolute
2731          * timestamps are not ideal because they need time synchronization. */
2732         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2733         if (req == NULL)
2734                 RETURN(-ENOMEM);
2735
2736         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2737         if (rc) {
2738                 ptlrpc_request_free(req);
2739                 RETURN(rc);
2740         }
2741         ptlrpc_request_set_replen(req);
2742         req->rq_request_portal = OST_CREATE_PORTAL;
2743         ptlrpc_at_set_req_timeout(req);
2744
2745         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2746                 /* procfs requests not want stat in wait for avoid deadlock */
2747                 req->rq_no_resend = 1;
2748                 req->rq_no_delay = 1;
2749         }
2750
2751         req->rq_interpret_reply = osc_statfs_interpret;
2752         aa = ptlrpc_req_async_args(aa, req);
2753         aa->aa_oi = oinfo;
2754
2755         ptlrpc_set_add_req(rqset, req);
2756         RETURN(0);
2757 }
2758
2759 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2760                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2761 {
2762         struct obd_device     *obd = class_exp2obd(exp);
2763         struct obd_statfs     *msfs;
2764         struct ptlrpc_request *req;
2765         struct obd_import     *imp = NULL;
2766         int rc;
2767         ENTRY;
2768
2769
2770         /*Since the request might also come from lprocfs, so we need
2771          *sync this with client_disconnect_export Bug15684*/
2772         down_read(&obd->u.cli.cl_sem);
2773         if (obd->u.cli.cl_import)
2774                 imp = class_import_get(obd->u.cli.cl_import);
2775         up_read(&obd->u.cli.cl_sem);
2776         if (!imp)
2777                 RETURN(-ENODEV);
2778
2779         /* We could possibly pass max_age in the request (as an absolute
2780          * timestamp or a "seconds.usec ago") so the target can avoid doing
2781          * extra calls into the filesystem if that isn't necessary (e.g.
2782          * during mount that would help a bit).  Having relative timestamps
2783          * is not so great if request processing is slow, while absolute
2784          * timestamps are not ideal because they need time synchronization. */
2785         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2786
2787         class_import_put(imp);
2788
2789         if (req == NULL)
2790                 RETURN(-ENOMEM);
2791
2792         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2793         if (rc) {
2794                 ptlrpc_request_free(req);
2795                 RETURN(rc);
2796         }
2797         ptlrpc_request_set_replen(req);
2798         req->rq_request_portal = OST_CREATE_PORTAL;
2799         ptlrpc_at_set_req_timeout(req);
2800
2801         if (flags & OBD_STATFS_NODELAY) {
2802                 /* procfs requests not want stat in wait for avoid deadlock */
2803                 req->rq_no_resend = 1;
2804                 req->rq_no_delay = 1;
2805         }
2806
2807         rc = ptlrpc_queue_wait(req);
2808         if (rc)
2809                 GOTO(out, rc);
2810
2811         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2812         if (msfs == NULL)
2813                 GOTO(out, rc = -EPROTO);
2814
2815         *osfs = *msfs;
2816
2817         EXIT;
2818 out:
2819         ptlrpc_req_finished(req);
2820         return rc;
2821 }
2822
2823 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2824                          void *karg, void __user *uarg)
2825 {
2826         struct obd_device *obd = exp->exp_obd;
2827         struct obd_ioctl_data *data = karg;
2828         int rc = 0;
2829
2830         ENTRY;
2831         if (!try_module_get(THIS_MODULE)) {
2832                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2833                        module_name(THIS_MODULE));
2834                 return -EINVAL;
2835         }
2836         switch (cmd) {
2837         case OBD_IOC_CLIENT_RECOVER:
2838                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2839                                            data->ioc_inlbuf1, 0);
2840                 if (rc > 0)
2841                         rc = 0;
2842                 break;
2843         case IOC_OSC_SET_ACTIVE:
2844                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2845                                               data->ioc_offset);
2846                 break;
2847         default:
2848                 rc = -ENOTTY;
2849                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2850                        obd->obd_name, cmd, current_comm(), rc);
2851                 break;
2852         }
2853
2854         module_put(THIS_MODULE);
2855         return rc;
2856 }
2857
2858 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2859                        u32 keylen, void *key, u32 vallen, void *val,
2860                        struct ptlrpc_request_set *set)
2861 {
2862         struct ptlrpc_request *req;
2863         struct obd_device     *obd = exp->exp_obd;
2864         struct obd_import     *imp = class_exp2cliimp(exp);
2865         char                  *tmp;
2866         int                    rc;
2867         ENTRY;
2868
2869         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2870
2871         if (KEY_IS(KEY_CHECKSUM)) {
2872                 if (vallen != sizeof(int))
2873                         RETURN(-EINVAL);
2874                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2875                 RETURN(0);
2876         }
2877
2878         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2879                 sptlrpc_conf_client_adapt(obd);
2880                 RETURN(0);
2881         }
2882
2883         if (KEY_IS(KEY_FLUSH_CTX)) {
2884                 sptlrpc_import_flush_my_ctx(imp);
2885                 RETURN(0);
2886         }
2887
2888         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2889                 struct client_obd *cli = &obd->u.cli;
2890                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2891                 long target = *(long *)val;
2892
2893                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2894                 *(long *)val -= nr;
2895                 RETURN(0);
2896         }
2897
2898         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2899                 RETURN(-EINVAL);
2900
2901         /* We pass all other commands directly to OST. Since nobody calls osc
2902            methods directly and everybody is supposed to go through LOV, we
2903            assume lov checked invalid values for us.
2904            The only recognised values so far are evict_by_nid and mds_conn.
2905            Even if something bad goes through, we'd get a -EINVAL from OST
2906            anyway. */
2907
2908         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2909                                                 &RQF_OST_SET_GRANT_INFO :
2910                                                 &RQF_OBD_SET_INFO);
2911         if (req == NULL)
2912                 RETURN(-ENOMEM);
2913
2914         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2915                              RCL_CLIENT, keylen);
2916         if (!KEY_IS(KEY_GRANT_SHRINK))
2917                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2918                                      RCL_CLIENT, vallen);
2919         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2920         if (rc) {
2921                 ptlrpc_request_free(req);
2922                 RETURN(rc);
2923         }
2924
2925         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2926         memcpy(tmp, key, keylen);
2927         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2928                                                         &RMF_OST_BODY :
2929                                                         &RMF_SETINFO_VAL);
2930         memcpy(tmp, val, vallen);
2931
2932         if (KEY_IS(KEY_GRANT_SHRINK)) {
2933                 struct osc_grant_args *aa;
2934                 struct obdo *oa;
2935
2936                 aa = ptlrpc_req_async_args(aa, req);
2937                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2938                 if (!oa) {
2939                         ptlrpc_req_finished(req);
2940                         RETURN(-ENOMEM);
2941                 }
2942                 *oa = ((struct ost_body *)val)->oa;
2943                 aa->aa_oa = oa;
2944                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2945         }
2946
2947         ptlrpc_request_set_replen(req);
2948         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2949                 LASSERT(set != NULL);
2950                 ptlrpc_set_add_req(set, req);
2951                 ptlrpc_check_set(NULL, set);
2952         } else {
2953                 ptlrpcd_add_req(req);
2954         }
2955
2956         RETURN(0);
2957 }
2958 EXPORT_SYMBOL(osc_set_info_async);
2959
2960 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2961                   struct obd_device *obd, struct obd_uuid *cluuid,
2962                   struct obd_connect_data *data, void *localdata)
2963 {
2964         struct client_obd *cli = &obd->u.cli;
2965
2966         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2967                 long lost_grant;
2968                 long grant;
2969
2970                 spin_lock(&cli->cl_loi_list_lock);
2971                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2972                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2973                         /* restore ocd_grant_blkbits as client page bits */
2974                         data->ocd_grant_blkbits = PAGE_SHIFT;
2975                         grant += cli->cl_dirty_grant;
2976                 } else {
2977                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2978                 }
2979                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2980                 lost_grant = cli->cl_lost_grant;
2981                 cli->cl_lost_grant = 0;
2982                 spin_unlock(&cli->cl_loi_list_lock);
2983
2984                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2985                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2986                        data->ocd_version, data->ocd_grant, lost_grant);
2987         }
2988
2989         RETURN(0);
2990 }
2991 EXPORT_SYMBOL(osc_reconnect);
2992
2993 int osc_disconnect(struct obd_export *exp)
2994 {
2995         struct obd_device *obd = class_exp2obd(exp);
2996         int rc;
2997
2998         rc = client_disconnect_export(exp);
2999         /**
3000          * Initially we put del_shrink_grant before disconnect_export, but it
3001          * causes the following problem if setup (connect) and cleanup
3002          * (disconnect) are tangled together.
3003          *      connect p1                     disconnect p2
3004          *   ptlrpc_connect_import
3005          *     ...............               class_manual_cleanup
3006          *                                     osc_disconnect
3007          *                                     del_shrink_grant
3008          *   ptlrpc_connect_interrupt
3009          *     osc_init_grant
3010          *   add this client to shrink list
3011          *                                      cleanup_osc
3012          * Bang! grant shrink thread trigger the shrink. BUG18662
3013          */
3014         osc_del_grant_list(&obd->u.cli);
3015         return rc;
3016 }
3017 EXPORT_SYMBOL(osc_disconnect);
3018
3019 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3020                                  struct hlist_node *hnode, void *arg)
3021 {
3022         struct lu_env *env = arg;
3023         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3024         struct ldlm_lock *lock;
3025         struct osc_object *osc = NULL;
3026         ENTRY;
3027
3028         lock_res(res);
3029         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3030                 if (lock->l_ast_data != NULL && osc == NULL) {
3031                         osc = lock->l_ast_data;
3032                         cl_object_get(osc2cl(osc));
3033                 }
3034
3035                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3036                  * by the 2nd round of ldlm_namespace_clean() call in
3037                  * osc_import_event(). */
3038                 ldlm_clear_cleaned(lock);
3039         }
3040         unlock_res(res);
3041
3042         if (osc != NULL) {
3043                 osc_object_invalidate(env, osc);
3044                 cl_object_put(env, osc2cl(osc));
3045         }
3046
3047         RETURN(0);
3048 }
3049 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3050
3051 static int osc_import_event(struct obd_device *obd,
3052                             struct obd_import *imp,
3053                             enum obd_import_event event)
3054 {
3055         struct client_obd *cli;
3056         int rc = 0;
3057
3058         ENTRY;
3059         LASSERT(imp->imp_obd == obd);
3060
3061         switch (event) {
3062         case IMP_EVENT_DISCON: {
3063                 cli = &obd->u.cli;
3064                 spin_lock(&cli->cl_loi_list_lock);
3065                 cli->cl_avail_grant = 0;
3066                 cli->cl_lost_grant = 0;
3067                 spin_unlock(&cli->cl_loi_list_lock);
3068                 break;
3069         }
3070         case IMP_EVENT_INACTIVE: {
3071                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3072                 break;
3073         }
3074         case IMP_EVENT_INVALIDATE: {
3075                 struct ldlm_namespace *ns = obd->obd_namespace;
3076                 struct lu_env         *env;
3077                 __u16                  refcheck;
3078
3079                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3080
3081                 env = cl_env_get(&refcheck);
3082                 if (!IS_ERR(env)) {
3083                         osc_io_unplug(env, &obd->u.cli, NULL);
3084
3085                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3086                                                  osc_ldlm_resource_invalidate,
3087                                                  env, 0);
3088                         cl_env_put(env, &refcheck);
3089
3090                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3091                 } else
3092                         rc = PTR_ERR(env);
3093                 break;
3094         }
3095         case IMP_EVENT_ACTIVE: {
3096                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3097                 break;
3098         }
3099         case IMP_EVENT_OCD: {
3100                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3101
3102                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3103                         osc_init_grant(&obd->u.cli, ocd);
3104
3105                 /* See bug 7198 */
3106                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3107                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3108
3109                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3110                 break;
3111         }
3112         case IMP_EVENT_DEACTIVATE: {
3113                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3114                 break;
3115         }
3116         case IMP_EVENT_ACTIVATE: {
3117                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3118                 break;
3119         }
3120         default:
3121                 CERROR("Unknown import event %d\n", event);
3122                 LBUG();
3123         }
3124         RETURN(rc);
3125 }
3126
3127 /**
3128  * Determine whether the lock can be canceled before replaying the lock
3129  * during recovery, see bug16774 for detailed information.
3130  *
3131  * \retval zero the lock can't be canceled
3132  * \retval other ok to cancel
3133  */
3134 static int osc_cancel_weight(struct ldlm_lock *lock)
3135 {
3136         /*
3137          * Cancel all unused and granted extent lock.
3138          */
3139         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3140             ldlm_is_granted(lock) &&
3141             osc_ldlm_weigh_ast(lock) == 0)
3142                 RETURN(1);
3143
3144         RETURN(0);
3145 }
3146
3147 static int brw_queue_work(const struct lu_env *env, void *data)
3148 {
3149         struct client_obd *cli = data;
3150
3151         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3152
3153         osc_io_unplug(env, cli, NULL);
3154         RETURN(0);
3155 }
3156
3157 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3158 {
3159         struct client_obd *cli = &obd->u.cli;
3160         void *handler;
3161         int rc;
3162