Whamcloud - gitweb
LU-10472 osc: add T10PI support for RPC checksum
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <lprocfs_status.h>
36 #include <lustre_debug.h>
37 #include <lustre_dlm.h>
38 #include <lustre_fid.h>
39 #include <lustre_ha.h>
40 #include <uapi/linux/lustre/lustre_ioctl.h>
41 #include <lustre_net.h>
42 #include <lustre_obdo.h>
43 #include <uapi/linux/lustre/lustre_param.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 #define osc_grant_args osc_brw_async_args
60
61 struct osc_setattr_args {
62         struct obdo             *sa_oa;
63         obd_enqueue_update_f     sa_upcall;
64         void                    *sa_cookie;
65 };
66
67 struct osc_fsync_args {
68         struct osc_object       *fa_obj;
69         struct obdo             *fa_oa;
70         obd_enqueue_update_f    fa_upcall;
71         void                    *fa_cookie;
72 };
73
74 struct osc_ladvise_args {
75         struct obdo             *la_oa;
76         obd_enqueue_update_f     la_upcall;
77         void                    *la_cookie;
78 };
79
80 static void osc_release_ppga(struct brw_page **ppga, size_t count);
81 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
82                          void *data, int rc);
83
84 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
85 {
86         struct ost_body *body;
87
88         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
89         LASSERT(body);
90
91         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
92 }
93
94 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
95                        struct obdo *oa)
96 {
97         struct ptlrpc_request   *req;
98         struct ost_body         *body;
99         int                      rc;
100
101         ENTRY;
102         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
103         if (req == NULL)
104                 RETURN(-ENOMEM);
105
106         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
107         if (rc) {
108                 ptlrpc_request_free(req);
109                 RETURN(rc);
110         }
111
112         osc_pack_req_body(req, oa);
113
114         ptlrpc_request_set_replen(req);
115
116         rc = ptlrpc_queue_wait(req);
117         if (rc)
118                 GOTO(out, rc);
119
120         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
121         if (body == NULL)
122                 GOTO(out, rc = -EPROTO);
123
124         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
125         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
126
127         oa->o_blksize = cli_brw_size(exp->exp_obd);
128         oa->o_valid |= OBD_MD_FLBLKSZ;
129
130         EXIT;
131 out:
132         ptlrpc_req_finished(req);
133
134         return rc;
135 }
136
137 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
138                        struct obdo *oa)
139 {
140         struct ptlrpc_request   *req;
141         struct ost_body         *body;
142         int                      rc;
143
144         ENTRY;
145         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
146
147         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
148         if (req == NULL)
149                 RETURN(-ENOMEM);
150
151         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
152         if (rc) {
153                 ptlrpc_request_free(req);
154                 RETURN(rc);
155         }
156
157         osc_pack_req_body(req, oa);
158
159         ptlrpc_request_set_replen(req);
160
161         rc = ptlrpc_queue_wait(req);
162         if (rc)
163                 GOTO(out, rc);
164
165         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 GOTO(out, rc = -EPROTO);
168
169         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
170
171         EXIT;
172 out:
173         ptlrpc_req_finished(req);
174
175         RETURN(rc);
176 }
177
178 static int osc_setattr_interpret(const struct lu_env *env,
179                                  struct ptlrpc_request *req,
180                                  struct osc_setattr_args *sa, int rc)
181 {
182         struct ost_body *body;
183         ENTRY;
184
185         if (rc != 0)
186                 GOTO(out, rc);
187
188         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
189         if (body == NULL)
190                 GOTO(out, rc = -EPROTO);
191
192         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
193                              &body->oa);
194 out:
195         rc = sa->sa_upcall(sa->sa_cookie, rc);
196         RETURN(rc);
197 }
198
199 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
200                       obd_enqueue_update_f upcall, void *cookie,
201                       struct ptlrpc_request_set *rqset)
202 {
203         struct ptlrpc_request   *req;
204         struct osc_setattr_args *sa;
205         int                      rc;
206
207         ENTRY;
208
209         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
210         if (req == NULL)
211                 RETURN(-ENOMEM);
212
213         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
214         if (rc) {
215                 ptlrpc_request_free(req);
216                 RETURN(rc);
217         }
218
219         osc_pack_req_body(req, oa);
220
221         ptlrpc_request_set_replen(req);
222
223         /* do mds to ost setattr asynchronously */
224         if (!rqset) {
225                 /* Do not wait for response. */
226                 ptlrpcd_add_req(req);
227         } else {
228                 req->rq_interpret_reply =
229                         (ptlrpc_interpterer_t)osc_setattr_interpret;
230
231                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
232                 sa = ptlrpc_req_async_args(req);
233                 sa->sa_oa = oa;
234                 sa->sa_upcall = upcall;
235                 sa->sa_cookie = cookie;
236
237                 if (rqset == PTLRPCD_SET)
238                         ptlrpcd_add_req(req);
239                 else
240                         ptlrpc_set_add_req(rqset, req);
241         }
242
243         RETURN(0);
244 }
245
246 static int osc_ladvise_interpret(const struct lu_env *env,
247                                  struct ptlrpc_request *req,
248                                  void *arg, int rc)
249 {
250         struct osc_ladvise_args *la = arg;
251         struct ost_body *body;
252         ENTRY;
253
254         if (rc != 0)
255                 GOTO(out, rc);
256
257         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
258         if (body == NULL)
259                 GOTO(out, rc = -EPROTO);
260
261         *la->la_oa = body->oa;
262 out:
263         rc = la->la_upcall(la->la_cookie, rc);
264         RETURN(rc);
265 }
266
267 /**
268  * If rqset is NULL, do not wait for response. Upcall and cookie could also
269  * be NULL in this case
270  */
271 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
272                      struct ladvise_hdr *ladvise_hdr,
273                      obd_enqueue_update_f upcall, void *cookie,
274                      struct ptlrpc_request_set *rqset)
275 {
276         struct ptlrpc_request   *req;
277         struct ost_body         *body;
278         struct osc_ladvise_args *la;
279         int                      rc;
280         struct lu_ladvise       *req_ladvise;
281         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
282         int                      num_advise = ladvise_hdr->lah_count;
283         struct ladvise_hdr      *req_ladvise_hdr;
284         ENTRY;
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
287         if (req == NULL)
288                 RETURN(-ENOMEM);
289
290         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
291                              num_advise * sizeof(*ladvise));
292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
293         if (rc != 0) {
294                 ptlrpc_request_free(req);
295                 RETURN(rc);
296         }
297         req->rq_request_portal = OST_IO_PORTAL;
298         ptlrpc_at_set_req_timeout(req);
299
300         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
301         LASSERT(body);
302         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
303                              oa);
304
305         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
306                                                  &RMF_OST_LADVISE_HDR);
307         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
308
309         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
310         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
311         ptlrpc_request_set_replen(req);
312
313         if (rqset == NULL) {
314                 /* Do not wait for response. */
315                 ptlrpcd_add_req(req);
316                 RETURN(0);
317         }
318
319         req->rq_interpret_reply = osc_ladvise_interpret;
320         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
321         la = ptlrpc_req_async_args(req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         if (rqset == PTLRPCD_SET)
327                 ptlrpcd_add_req(req);
328         else
329                 ptlrpc_set_add_req(rqset, req);
330
331         RETURN(0);
332 }
333
334 static int osc_create(const struct lu_env *env, struct obd_export *exp,
335                       struct obdo *oa)
336 {
337         struct ptlrpc_request *req;
338         struct ost_body       *body;
339         int                    rc;
340         ENTRY;
341
342         LASSERT(oa != NULL);
343         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
344         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
345
346         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
347         if (req == NULL)
348                 GOTO(out, rc = -ENOMEM);
349
350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 GOTO(out, rc);
354         }
355
356         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
357         LASSERT(body);
358
359         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
360
361         ptlrpc_request_set_replen(req);
362
363         rc = ptlrpc_queue_wait(req);
364         if (rc)
365                 GOTO(out_req, rc);
366
367         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
368         if (body == NULL)
369                 GOTO(out_req, rc = -EPROTO);
370
371         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
372         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
373
374         oa->o_blksize = cli_brw_size(exp->exp_obd);
375         oa->o_valid |= OBD_MD_FLBLKSZ;
376
377         CDEBUG(D_HA, "transno: %lld\n",
378                lustre_msg_get_transno(req->rq_repmsg));
379 out_req:
380         ptlrpc_req_finished(req);
381 out:
382         RETURN(rc);
383 }
384
385 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
386                    obd_enqueue_update_f upcall, void *cookie)
387 {
388         struct ptlrpc_request *req;
389         struct osc_setattr_args *sa;
390         struct obd_import *imp = class_exp2cliimp(exp);
391         struct ost_body *body;
392         int rc;
393
394         ENTRY;
395
396         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
397         if (req == NULL)
398                 RETURN(-ENOMEM);
399
400         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
401         if (rc < 0) {
402                 ptlrpc_request_free(req);
403                 RETURN(rc);
404         }
405
406         osc_set_io_portal(req);
407
408         ptlrpc_at_set_req_timeout(req);
409
410         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
411
412         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
413
414         ptlrpc_request_set_replen(req);
415
416         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
417         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
418         sa = ptlrpc_req_async_args(req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req,
431                               void *arg, int rc)
432 {
433         struct osc_fsync_args   *fa = arg;
434         struct ost_body         *body;
435         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
436         unsigned long           valid = 0;
437         struct cl_object        *obj;
438         ENTRY;
439
440         if (rc != 0)
441                 GOTO(out, rc);
442
443         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
444         if (body == NULL) {
445                 CERROR("can't unpack ost_body\n");
446                 GOTO(out, rc = -EPROTO);
447         }
448
449         *fa->fa_oa = body->oa;
450         obj = osc2cl(fa->fa_obj);
451
452         /* Update osc object's blocks attribute */
453         cl_object_attr_lock(obj);
454         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
455                 attr->cat_blocks = body->oa.o_blocks;
456                 valid |= CAT_BLOCKS;
457         }
458
459         if (valid != 0)
460                 cl_object_attr_update(env, obj, attr, valid);
461         cl_object_attr_unlock(obj);
462
463 out:
464         rc = fa->fa_upcall(fa->fa_cookie, rc);
465         RETURN(rc);
466 }
467
468 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
469                   obd_enqueue_update_f upcall, void *cookie,
470                   struct ptlrpc_request_set *rqset)
471 {
472         struct obd_export     *exp = osc_export(obj);
473         struct ptlrpc_request *req;
474         struct ost_body       *body;
475         struct osc_fsync_args *fa;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
480         if (req == NULL)
481                 RETURN(-ENOMEM);
482
483         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
484         if (rc) {
485                 ptlrpc_request_free(req);
486                 RETURN(rc);
487         }
488
489         /* overload the size and blocks fields in the oa with start/end */
490         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
491         LASSERT(body);
492         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
493
494         ptlrpc_request_set_replen(req);
495         req->rq_interpret_reply = osc_sync_interpret;
496
497         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
498         fa = ptlrpc_req_async_args(req);
499         fa->fa_obj = obj;
500         fa->fa_oa = oa;
501         fa->fa_upcall = upcall;
502         fa->fa_cookie = cookie;
503
504         if (rqset == PTLRPCD_SET)
505                 ptlrpcd_add_req(req);
506         else
507                 ptlrpc_set_add_req(rqset, req);
508
509         RETURN (0);
510 }
511
512 /* Find and cancel locally locks matched by @mode in the resource found by
513  * @objid. Found locks are added into @cancel list. Returns the amount of
514  * locks added to @cancels list. */
515 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
516                                    struct list_head *cancels,
517                                    enum ldlm_mode mode, __u64 lock_flags)
518 {
519         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
520         struct ldlm_res_id res_id;
521         struct ldlm_resource *res;
522         int count;
523         ENTRY;
524
525         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
526          * export) but disabled through procfs (flag in NS).
527          *
528          * This distinguishes from a case when ELC is not supported originally,
529          * when we still want to cancel locks in advance and just cancel them
530          * locally, without sending any RPC. */
531         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
532                 RETURN(0);
533
534         ostid_build_res_name(&oa->o_oi, &res_id);
535         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
536         if (IS_ERR(res))
537                 RETURN(0);
538
539         LDLM_RESOURCE_ADDREF(res);
540         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
541                                            lock_flags, 0, NULL);
542         LDLM_RESOURCE_DELREF(res);
543         ldlm_resource_putref(res);
544         RETURN(count);
545 }
546
547 static int osc_destroy_interpret(const struct lu_env *env,
548                                  struct ptlrpc_request *req, void *data,
549                                  int rc)
550 {
551         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
552
553         atomic_dec(&cli->cl_destroy_in_flight);
554         wake_up(&cli->cl_destroy_waitq);
555         return 0;
556 }
557
558 static int osc_can_send_destroy(struct client_obd *cli)
559 {
560         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
561             cli->cl_max_rpcs_in_flight) {
562                 /* The destroy request can be sent */
563                 return 1;
564         }
565         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
566             cli->cl_max_rpcs_in_flight) {
567                 /*
568                  * The counter has been modified between the two atomic
569                  * operations.
570                  */
571                 wake_up(&cli->cl_destroy_waitq);
572         }
573         return 0;
574 }
575
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
601                                0, &cancels, count);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
608         ptlrpc_at_set_req_timeout(req);
609
610         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
611         LASSERT(body);
612         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
613
614         ptlrpc_request_set_replen(req);
615
616         req->rq_interpret_reply = osc_destroy_interpret;
617         if (!osc_can_send_destroy(cli)) {
618                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
619
620                 /*
621                  * Wait until the number of on-going destroy RPCs drops
622                  * under max_rpc_in_flight
623                  */
624                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
625                                             osc_can_send_destroy(cli), &lwi);
626                 if (rc) {
627                         ptlrpc_req_finished(req);
628                         RETURN(rc);
629                 }
630         }
631
632         /* Do not wait for response */
633         ptlrpcd_add_req(req);
634         RETURN(0);
635 }
636
637 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
638                                 long writing_bytes)
639 {
640         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
641
642         LASSERT(!(oa->o_valid & bits));
643
644         oa->o_valid |= bits;
645         spin_lock(&cli->cl_loi_list_lock);
646         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
647                 oa->o_dirty = cli->cl_dirty_grant;
648         else
649                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
650         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
651                      cli->cl_dirty_max_pages)) {
652                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
653                        cli->cl_dirty_pages, cli->cl_dirty_transit,
654                        cli->cl_dirty_max_pages);
655                 oa->o_undirty = 0;
656         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
657                             atomic_long_read(&obd_dirty_transit_pages) >
658                             (long)(obd_max_dirty_pages + 1))) {
659                 /* The atomic_read() allowing the atomic_inc() are
660                  * not covered by a lock thus they may safely race and trip
661                  * this CERROR() unless we add in a small fudge factor (+1). */
662                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
663                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
664                        atomic_long_read(&obd_dirty_transit_pages),
665                        obd_max_dirty_pages);
666                 oa->o_undirty = 0;
667         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
668                             0x7fffffff)) {
669                 CERROR("dirty %lu - dirty_max %lu too big???\n",
670                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
671                 oa->o_undirty = 0;
672         } else {
673                 unsigned long nrpages;
674                 unsigned long undirty;
675
676                 nrpages = cli->cl_max_pages_per_rpc;
677                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
678                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
679                 undirty = nrpages << PAGE_SHIFT;
680                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
681                                  GRANT_PARAM)) {
682                         int nrextents;
683
684                         /* take extent tax into account when asking for more
685                          * grant space */
686                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
687                                      cli->cl_max_extent_pages;
688                         undirty += nrextents * cli->cl_grant_extent_tax;
689                 }
690                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
691                  * to add extent tax, etc.
692                  */
693                 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
694                                     (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
695         }
696         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
697         oa->o_dropped = cli->cl_lost_grant;
698         cli->cl_lost_grant = 0;
699         spin_unlock(&cli->cl_loi_list_lock);
700         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
701                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
702 }
703
704 void osc_update_next_shrink(struct client_obd *cli)
705 {
706         cli->cl_next_shrink_grant = ktime_get_seconds() +
707                                     cli->cl_grant_shrink_interval;
708
709         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
710                cli->cl_next_shrink_grant);
711 }
712
713 static void __osc_update_grant(struct client_obd *cli, u64 grant)
714 {
715         spin_lock(&cli->cl_loi_list_lock);
716         cli->cl_avail_grant += grant;
717         spin_unlock(&cli->cl_loi_list_lock);
718 }
719
720 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
721 {
722         if (body->oa.o_valid & OBD_MD_FLGRANT) {
723                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
724                 __osc_update_grant(cli, body->oa.o_grant);
725         }
726 }
727
728 static int osc_shrink_grant_interpret(const struct lu_env *env,
729                                       struct ptlrpc_request *req,
730                                       void *aa, int rc)
731 {
732         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
733         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
734         struct ost_body *body;
735
736         if (rc != 0) {
737                 __osc_update_grant(cli, oa->o_grant);
738                 GOTO(out, rc);
739         }
740
741         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
742         LASSERT(body);
743         osc_update_grant(cli, body);
744 out:
745         OBDO_FREE(oa);
746         return rc;
747 }
748
749 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         oa->o_grant = cli->cl_avail_grant / 4;
753         cli->cl_avail_grant -= oa->o_grant;
754         spin_unlock(&cli->cl_loi_list_lock);
755         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
756                 oa->o_valid |= OBD_MD_FLFLAGS;
757                 oa->o_flags = 0;
758         }
759         oa->o_flags |= OBD_FL_SHRINK_GRANT;
760         osc_update_next_shrink(cli);
761 }
762
763 /* Shrink the current grant, either from some large amount to enough for a
764  * full set of in-flight RPCs, or if we have already shrunk to that limit
765  * then to enough for a single RPC.  This avoids keeping more grant than
766  * needed, and avoids shrinking the grant piecemeal. */
767 static int osc_shrink_grant(struct client_obd *cli)
768 {
769         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
770                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
771
772         spin_lock(&cli->cl_loi_list_lock);
773         if (cli->cl_avail_grant <= target_bytes)
774                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
775         spin_unlock(&cli->cl_loi_list_lock);
776
777         return osc_shrink_grant_to_target(cli, target_bytes);
778 }
779
780 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
781 {
782         int                     rc = 0;
783         struct ost_body        *body;
784         ENTRY;
785
786         spin_lock(&cli->cl_loi_list_lock);
787         /* Don't shrink if we are already above or below the desired limit
788          * We don't want to shrink below a single RPC, as that will negatively
789          * impact block allocation and long-term performance. */
790         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
791                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
792
793         if (target_bytes >= cli->cl_avail_grant) {
794                 spin_unlock(&cli->cl_loi_list_lock);
795                 RETURN(0);
796         }
797         spin_unlock(&cli->cl_loi_list_lock);
798
799         OBD_ALLOC_PTR(body);
800         if (!body)
801                 RETURN(-ENOMEM);
802
803         osc_announce_cached(cli, &body->oa, 0);
804
805         spin_lock(&cli->cl_loi_list_lock);
806         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
807         cli->cl_avail_grant = target_bytes;
808         spin_unlock(&cli->cl_loi_list_lock);
809         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
810                 body->oa.o_valid |= OBD_MD_FLFLAGS;
811                 body->oa.o_flags = 0;
812         }
813         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
814         osc_update_next_shrink(cli);
815
816         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
817                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
818                                 sizeof(*body), body, NULL);
819         if (rc != 0)
820                 __osc_update_grant(cli, body->oa.o_grant);
821         OBD_FREE_PTR(body);
822         RETURN(rc);
823 }
824
825 static int osc_should_shrink_grant(struct client_obd *client)
826 {
827         time64_t next_shrink = client->cl_next_shrink_grant;
828
829         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
830              OBD_CONNECT_GRANT_SHRINK) == 0)
831                 return 0;
832
833         if (ktime_get_seconds() >= next_shrink - 5) {
834                 /* Get the current RPC size directly, instead of going via:
835                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
836                  * Keep comment here so that it can be found by searching. */
837                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
838
839                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
840                     client->cl_avail_grant > brw_size)
841                         return 1;
842                 else
843                         osc_update_next_shrink(client);
844         }
845         return 0;
846 }
847
848 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
849 {
850         struct client_obd *client;
851
852         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
853                 if (osc_should_shrink_grant(client))
854                         osc_shrink_grant(client);
855         }
856         return 0;
857 }
858
859 static int osc_add_shrink_grant(struct client_obd *client)
860 {
861         int rc;
862
863         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
864                                        TIMEOUT_GRANT,
865                                        osc_grant_shrink_grant_cb, NULL,
866                                        &client->cl_grant_shrink_list);
867         if (rc) {
868                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
869                 return rc;
870         }
871         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
872         osc_update_next_shrink(client);
873         return 0;
874 }
875
876 static int osc_del_shrink_grant(struct client_obd *client)
877 {
878         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
879                                          TIMEOUT_GRANT);
880 }
881
882 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
883 {
884         /*
885          * ocd_grant is the total grant amount we're expect to hold: if we've
886          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
887          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
888          * dirty.
889          *
890          * race is tolerable here: if we're evicted, but imp_state already
891          * left EVICTED state, then cl_dirty_pages must be 0 already.
892          */
893         spin_lock(&cli->cl_loi_list_lock);
894         cli->cl_avail_grant = ocd->ocd_grant;
895         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
896                 cli->cl_avail_grant -= cli->cl_reserved_grant;
897                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
898                         cli->cl_avail_grant -= cli->cl_dirty_grant;
899                 else
900                         cli->cl_avail_grant -=
901                                         cli->cl_dirty_pages << PAGE_SHIFT;
902         }
903
904         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
905                 u64 size;
906                 int chunk_mask;
907
908                 /* overhead for each extent insertion */
909                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
910                 /* determine the appropriate chunk size used by osc_extent. */
911                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
912                                           ocd->ocd_grant_blkbits);
913                 /* max_pages_per_rpc must be chunk aligned */
914                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
915                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
916                                              ~chunk_mask) & chunk_mask;
917                 /* determine maximum extent size, in #pages */
918                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
919                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
920                 if (cli->cl_max_extent_pages == 0)
921                         cli->cl_max_extent_pages = 1;
922         } else {
923                 cli->cl_grant_extent_tax = 0;
924                 cli->cl_chunkbits = PAGE_SHIFT;
925                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
926         }
927         spin_unlock(&cli->cl_loi_list_lock);
928
929         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
930                 "chunk bits: %d cl_max_extent_pages: %d\n",
931                 cli_name(cli),
932                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
933                 cli->cl_max_extent_pages);
934
935         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
936             list_empty(&cli->cl_grant_shrink_list))
937                 osc_add_shrink_grant(cli);
938 }
939 EXPORT_SYMBOL(osc_init_grant);
940
941 /* We assume that the reason this OSC got a short read is because it read
942  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
943  * via the LOV, and it _knows_ it's reading inside the file, it's just that
944  * this stripe never got written at or beyond this stripe offset yet. */
945 static void handle_short_read(int nob_read, size_t page_count,
946                               struct brw_page **pga)
947 {
948         char *ptr;
949         int i = 0;
950
951         /* skip bytes read OK */
952         while (nob_read > 0) {
953                 LASSERT (page_count > 0);
954
955                 if (pga[i]->count > nob_read) {
956                         /* EOF inside this page */
957                         ptr = kmap(pga[i]->pg) +
958                                 (pga[i]->off & ~PAGE_MASK);
959                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
960                         kunmap(pga[i]->pg);
961                         page_count--;
962                         i++;
963                         break;
964                 }
965
966                 nob_read -= pga[i]->count;
967                 page_count--;
968                 i++;
969         }
970
971         /* zero remaining pages */
972         while (page_count-- > 0) {
973                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
974                 memset(ptr, 0, pga[i]->count);
975                 kunmap(pga[i]->pg);
976                 i++;
977         }
978 }
979
980 static int check_write_rcs(struct ptlrpc_request *req,
981                            int requested_nob, int niocount,
982                            size_t page_count, struct brw_page **pga)
983 {
984         int     i;
985         __u32   *remote_rcs;
986
987         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
988                                                   sizeof(*remote_rcs) *
989                                                   niocount);
990         if (remote_rcs == NULL) {
991                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
992                 return(-EPROTO);
993         }
994
995         /* return error if any niobuf was in error */
996         for (i = 0; i < niocount; i++) {
997                 if ((int)remote_rcs[i] < 0)
998                         return(remote_rcs[i]);
999
1000                 if (remote_rcs[i] != 0) {
1001                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1002                                 i, remote_rcs[i], req);
1003                         return(-EPROTO);
1004                 }
1005         }
1006         if (req->rq_bulk != NULL &&
1007             req->rq_bulk->bd_nob_transferred != requested_nob) {
1008                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1009                        req->rq_bulk->bd_nob_transferred, requested_nob);
1010                 return(-EPROTO);
1011         }
1012
1013         return (0);
1014 }
1015
1016 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1017 {
1018         if (p1->flag != p2->flag) {
1019                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1020                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1021                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1022
1023                 /* warn if we try to combine flags that we don't know to be
1024                  * safe to combine */
1025                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1026                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1027                               "report this at https://jira.hpdd.intel.com/\n",
1028                               p1->flag, p2->flag);
1029                 }
1030                 return 0;
1031         }
1032
1033         return (p1->off + p1->count == p2->off);
1034 }
1035
1036 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1037                                    size_t pg_count, struct brw_page **pga,
1038                                    int opc, obd_dif_csum_fn *fn,
1039                                    int sector_size,
1040                                    u32 *check_sum)
1041 {
1042         struct cfs_crypto_hash_desc *hdesc;
1043         /* Used Adler as the default checksum type on top of DIF tags */
1044         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1045         struct page *__page;
1046         unsigned char *buffer;
1047         __u16 *guard_start;
1048         unsigned int bufsize;
1049         int guard_number;
1050         int used_number = 0;
1051         int used;
1052         u32 cksum;
1053         int rc = 0;
1054         int i = 0;
1055
1056         LASSERT(pg_count > 0);
1057
1058         __page = alloc_page(GFP_KERNEL);
1059         if (__page == NULL)
1060                 return -ENOMEM;
1061
1062         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1063         if (IS_ERR(hdesc)) {
1064                 rc = PTR_ERR(hdesc);
1065                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1066                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1067                 GOTO(out, rc);
1068         }
1069
1070         buffer = kmap(__page);
1071         guard_start = (__u16 *)buffer;
1072         guard_number = PAGE_SIZE / sizeof(*guard_start);
1073         while (nob > 0 && pg_count > 0) {
1074                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1075
1076                 /* corrupt the data before we compute the checksum, to
1077                  * simulate an OST->client data error */
1078                 if (unlikely(i == 0 && opc == OST_READ &&
1079                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1080                         unsigned char *ptr = kmap(pga[i]->pg);
1081                         int off = pga[i]->off & ~PAGE_MASK;
1082
1083                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1084                         kunmap(pga[i]->pg);
1085                 }
1086
1087                 /*
1088                  * The left guard number should be able to hold checksums of a
1089                  * whole page
1090                  */
1091                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, 0,
1092                                                   count,
1093                                                   guard_start + used_number,
1094                                                   guard_number - used_number,
1095                                                   &used, sector_size,
1096                                                   fn);
1097                 if (rc)
1098                         break;
1099
1100                 used_number += used;
1101                 if (used_number == guard_number) {
1102                         cfs_crypto_hash_update_page(hdesc, __page, 0,
1103                                 used_number * sizeof(*guard_start));
1104                         used_number = 0;
1105                 }
1106
1107                 nob -= pga[i]->count;
1108                 pg_count--;
1109                 i++;
1110         }
1111         kunmap(__page);
1112         if (rc)
1113                 GOTO(out, rc);
1114
1115         if (used_number != 0)
1116                 cfs_crypto_hash_update_page(hdesc, __page, 0,
1117                         used_number * sizeof(*guard_start));
1118
1119         bufsize = sizeof(cksum);
1120         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1121
1122         /* For sending we only compute the wrong checksum instead
1123          * of corrupting the data so it is still correct on a redo */
1124         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1125                 cksum++;
1126
1127         *check_sum = cksum;
1128 out:
1129         __free_page(__page);
1130         return rc;
1131 }
1132
1133 static int osc_checksum_bulk(int nob, size_t pg_count,
1134                              struct brw_page **pga, int opc,
1135                              enum cksum_types cksum_type,
1136                              u32 *cksum)
1137 {
1138         int                             i = 0;
1139         struct cfs_crypto_hash_desc     *hdesc;
1140         unsigned int                    bufsize;
1141         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1142
1143         LASSERT(pg_count > 0);
1144
1145         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1146         if (IS_ERR(hdesc)) {
1147                 CERROR("Unable to initialize checksum hash %s\n",
1148                        cfs_crypto_hash_name(cfs_alg));
1149                 return PTR_ERR(hdesc);
1150         }
1151
1152         while (nob > 0 && pg_count > 0) {
1153                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1154
1155                 /* corrupt the data before we compute the checksum, to
1156                  * simulate an OST->client data error */
1157                 if (i == 0 && opc == OST_READ &&
1158                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1159                         unsigned char *ptr = kmap(pga[i]->pg);
1160                         int off = pga[i]->off & ~PAGE_MASK;
1161
1162                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1163                         kunmap(pga[i]->pg);
1164                 }
1165                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1166                                             pga[i]->off & ~PAGE_MASK,
1167                                             count);
1168                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1169                                (int)(pga[i]->off & ~PAGE_MASK));
1170
1171                 nob -= pga[i]->count;
1172                 pg_count--;
1173                 i++;
1174         }
1175
1176         bufsize = sizeof(*cksum);
1177         cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
1178
1179         /* For sending we only compute the wrong checksum instead
1180          * of corrupting the data so it is still correct on a redo */
1181         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1182                 (*cksum)++;
1183
1184         return 0;
1185 }
1186
1187 static int osc_checksum_bulk_rw(const char *obd_name,
1188                                 enum cksum_types cksum_type,
1189                                 int nob, size_t pg_count,
1190                                 struct brw_page **pga, int opc,
1191                                 u32 *check_sum)
1192 {
1193         obd_dif_csum_fn *fn = NULL;
1194         int sector_size = 0;
1195         int rc;
1196
1197         ENTRY;
1198         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1199
1200         if (fn)
1201                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1202                                              opc, fn, sector_size, check_sum);
1203         else
1204                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1205                                        check_sum);
1206
1207         RETURN(rc);
1208 }
1209
1210 static int
1211 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1212                      u32 page_count, struct brw_page **pga,
1213                      struct ptlrpc_request **reqp, int resend)
1214 {
1215         struct ptlrpc_request   *req;
1216         struct ptlrpc_bulk_desc *desc;
1217         struct ost_body         *body;
1218         struct obd_ioobj        *ioobj;
1219         struct niobuf_remote    *niobuf;
1220         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1221         struct osc_brw_async_args *aa;
1222         struct req_capsule      *pill;
1223         struct brw_page *pg_prev;
1224         void *short_io_buf;
1225         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1226
1227         ENTRY;
1228         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1229                 RETURN(-ENOMEM); /* Recoverable */
1230         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1231                 RETURN(-EINVAL); /* Fatal */
1232
1233         if ((cmd & OBD_BRW_WRITE) != 0) {
1234                 opc = OST_WRITE;
1235                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1236                                                 osc_rq_pool,
1237                                                 &RQF_OST_BRW_WRITE);
1238         } else {
1239                 opc = OST_READ;
1240                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1241         }
1242         if (req == NULL)
1243                 RETURN(-ENOMEM);
1244
1245         for (niocount = i = 1; i < page_count; i++) {
1246                 if (!can_merge_pages(pga[i - 1], pga[i]))
1247                         niocount++;
1248         }
1249
1250         pill = &req->rq_pill;
1251         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1252                              sizeof(*ioobj));
1253         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1254                              niocount * sizeof(*niobuf));
1255
1256         for (i = 0; i < page_count; i++)
1257                 short_io_size += pga[i]->count;
1258
1259         /* Check if we can do a short io. */
1260         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1261             imp_connect_shortio(cli->cl_import)))
1262                 short_io_size = 0;
1263
1264         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1265                              opc == OST_READ ? 0 : short_io_size);
1266         if (opc == OST_READ)
1267                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1268                                      short_io_size);
1269
1270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1271         if (rc) {
1272                 ptlrpc_request_free(req);
1273                 RETURN(rc);
1274         }
1275         osc_set_io_portal(req);
1276
1277         ptlrpc_at_set_req_timeout(req);
1278         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1279          * retry logic */
1280         req->rq_no_retry_einprogress = 1;
1281
1282         if (short_io_size != 0) {
1283                 desc = NULL;
1284                 short_io_buf = NULL;
1285                 goto no_bulk;
1286         }
1287
1288         desc = ptlrpc_prep_bulk_imp(req, page_count,
1289                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1290                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1291                         PTLRPC_BULK_PUT_SINK) |
1292                         PTLRPC_BULK_BUF_KIOV,
1293                 OST_BULK_PORTAL,
1294                 &ptlrpc_bulk_kiov_pin_ops);
1295
1296         if (desc == NULL)
1297                 GOTO(out, rc = -ENOMEM);
1298         /* NB request now owns desc and will free it when it gets freed */
1299 no_bulk:
1300         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1301         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1302         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1303         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1304
1305         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1306
1307         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1308          * and from_kgid(), because they are asynchronous. Fortunately, variable
1309          * oa contains valid o_uid and o_gid in these two operations.
1310          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1311          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1312          * other process logic */
1313         body->oa.o_uid = oa->o_uid;
1314         body->oa.o_gid = oa->o_gid;
1315
1316         obdo_to_ioobj(oa, ioobj);
1317         ioobj->ioo_bufcnt = niocount;
1318         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319          * that might be send for this request.  The actual number is decided
1320          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321          * "max - 1" for old client compatibility sending "0", and also so the
1322          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323         if (desc != NULL)
1324                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1325         else /* short io */
1326                 ioobj_max_brw_set(ioobj, 0);
1327
1328         if (short_io_size != 0) {
1329                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1330                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1331                         body->oa.o_flags = 0;
1332                 }
1333                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1334                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1335                        short_io_size);
1336                 if (opc == OST_WRITE) {
1337                         short_io_buf = req_capsule_client_get(pill,
1338                                                               &RMF_SHORT_IO);
1339                         LASSERT(short_io_buf != NULL);
1340                 }
1341         }
1342
1343         LASSERT(page_count > 0);
1344         pg_prev = pga[0];
1345         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1346                 struct brw_page *pg = pga[i];
1347                 int poff = pg->off & ~PAGE_MASK;
1348
1349                 LASSERT(pg->count > 0);
1350                 /* make sure there is no gap in the middle of page array */
1351                 LASSERTF(page_count == 1 ||
1352                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1353                           ergo(i > 0 && i < page_count - 1,
1354                                poff == 0 && pg->count == PAGE_SIZE)   &&
1355                           ergo(i == page_count - 1, poff == 0)),
1356                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1357                          i, page_count, pg, pg->off, pg->count);
1358                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1360                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1361                          i, page_count,
1362                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1363                          pg_prev->pg, page_private(pg_prev->pg),
1364                          pg_prev->pg->index, pg_prev->off);
1365                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1366                         (pg->flag & OBD_BRW_SRVLOCK));
1367                 if (short_io_size != 0 && opc == OST_WRITE) {
1368                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1369
1370                         LASSERT(short_io_size >= requested_nob + pg->count);
1371                         memcpy(short_io_buf + requested_nob,
1372                                ptr + poff,
1373                                pg->count);
1374                         ll_kunmap_atomic(ptr, KM_USER0);
1375                 } else if (short_io_size == 0) {
1376                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1377                                                          pg->count);
1378                 }
1379                 requested_nob += pg->count;
1380
1381                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1382                         niobuf--;
1383                         niobuf->rnb_len += pg->count;
1384                 } else {
1385                         niobuf->rnb_offset = pg->off;
1386                         niobuf->rnb_len    = pg->count;
1387                         niobuf->rnb_flags  = pg->flag;
1388                 }
1389                 pg_prev = pg;
1390         }
1391
1392         LASSERTF((void *)(niobuf - niocount) ==
1393                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1394                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1395                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1396
1397         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1398         if (resend) {
1399                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1400                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1401                         body->oa.o_flags = 0;
1402                 }
1403                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1404         }
1405
1406         if (osc_should_shrink_grant(cli))
1407                 osc_shrink_grant_local(cli, &body->oa);
1408
1409         /* size[REQ_REC_OFF] still sizeof (*body) */
1410         if (opc == OST_WRITE) {
1411                 if (cli->cl_checksum &&
1412                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1413                         /* store cl_cksum_type in a local variable since
1414                          * it can be changed via lprocfs */
1415                         enum cksum_types cksum_type = cli->cl_cksum_type;
1416
1417                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1418                                 body->oa.o_flags = 0;
1419
1420                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1421                                                                 cksum_type);
1422                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423
1424                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1425                                                   requested_nob, page_count,
1426                                                   pga, OST_WRITE,
1427                                                   &body->oa.o_cksum);
1428                         if (rc < 0) {
1429                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1430                                        rc);
1431                                 GOTO(out, rc);
1432                         }
1433                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1434                                body->oa.o_cksum);
1435
1436                         /* save this in 'oa', too, for later checking */
1437                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1438                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1439                                                            cksum_type);
1440                 } else {
1441                         /* clear out the checksum flag, in case this is a
1442                          * resend but cl_checksum is no longer set. b=11238 */
1443                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1444                 }
1445                 oa->o_cksum = body->oa.o_cksum;
1446                 /* 1 RC per niobuf */
1447                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1448                                      sizeof(__u32) * niocount);
1449         } else {
1450                 if (cli->cl_checksum &&
1451                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1452                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1453                                 body->oa.o_flags = 0;
1454                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1455                                 cli->cl_cksum_type);
1456                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1457                 }
1458
1459                 /* Client cksum has been already copied to wire obdo in previous
1460                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1461                  * resent due to cksum error, this will allow Server to
1462                  * check+dump pages on its side */
1463         }
1464         ptlrpc_request_set_replen(req);
1465
1466         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1467         aa = ptlrpc_req_async_args(req);
1468         aa->aa_oa = oa;
1469         aa->aa_requested_nob = requested_nob;
1470         aa->aa_nio_count = niocount;
1471         aa->aa_page_count = page_count;
1472         aa->aa_resends = 0;
1473         aa->aa_ppga = pga;
1474         aa->aa_cli = cli;
1475         INIT_LIST_HEAD(&aa->aa_oaps);
1476
1477         *reqp = req;
1478         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1479         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1480                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1481                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1482         RETURN(0);
1483
1484  out:
1485         ptlrpc_req_finished(req);
1486         RETURN(rc);
1487 }
1488
1489 char dbgcksum_file_name[PATH_MAX];
1490
1491 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1492                                 struct brw_page **pga, __u32 server_cksum,
1493                                 __u32 client_cksum)
1494 {
1495         struct file *filp;
1496         int rc, i;
1497         unsigned int len;
1498         char *buf;
1499
1500         /* will only keep dump of pages on first error for the same range in
1501          * file/fid, not during the resends/retries. */
1502         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1503                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1504                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1505                   libcfs_debug_file_path_arr :
1506                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1507                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1508                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1509                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1510                  pga[0]->off,
1511                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1512                  client_cksum, server_cksum);
1513         filp = filp_open(dbgcksum_file_name,
1514                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1515         if (IS_ERR(filp)) {
1516                 rc = PTR_ERR(filp);
1517                 if (rc == -EEXIST)
1518                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1519                                "checksum error: rc = %d\n", dbgcksum_file_name,
1520                                rc);
1521                 else
1522                         CERROR("%s: can't open to dump pages with checksum "
1523                                "error: rc = %d\n", dbgcksum_file_name, rc);
1524                 return;
1525         }
1526
1527         for (i = 0; i < page_count; i++) {
1528                 len = pga[i]->count;
1529                 buf = kmap(pga[i]->pg);
1530                 while (len != 0) {
1531                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1532                         if (rc < 0) {
1533                                 CERROR("%s: wanted to write %u but got %d "
1534                                        "error\n", dbgcksum_file_name, len, rc);
1535                                 break;
1536                         }
1537                         len -= rc;
1538                         buf += rc;
1539                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1540                                dbgcksum_file_name, rc);
1541                 }
1542                 kunmap(pga[i]->pg);
1543         }
1544
1545         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1546         if (rc)
1547                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1548         filp_close(filp, NULL);
1549         return;
1550 }
1551
1552 static int
1553 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1554                      __u32 client_cksum, __u32 server_cksum,
1555                      struct osc_brw_async_args *aa)
1556 {
1557         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1558         enum cksum_types cksum_type;
1559         obd_dif_csum_fn *fn = NULL;
1560         int sector_size = 0;
1561         bool t10pi = false;
1562         __u32 new_cksum;
1563         char *msg;
1564         int rc;
1565
1566         if (server_cksum == client_cksum) {
1567                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1568                 return 0;
1569         }
1570
1571         if (aa->aa_cli->cl_checksum_dump)
1572                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1573                                     server_cksum, client_cksum);
1574
1575         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1576                                            oa->o_flags : 0);
1577
1578         switch (cksum_type) {
1579         case OBD_CKSUM_T10IP512:
1580                 t10pi = true;
1581                 fn = obd_dif_ip_fn;
1582                 sector_size = 512;
1583                 break;
1584         case OBD_CKSUM_T10IP4K:
1585                 t10pi = true;
1586                 fn = obd_dif_ip_fn;
1587                 sector_size = 4096;
1588                 break;
1589         case OBD_CKSUM_T10CRC512:
1590                 t10pi = true;
1591                 fn = obd_dif_crc_fn;
1592                 sector_size = 512;
1593                 break;
1594         case OBD_CKSUM_T10CRC4K:
1595                 t10pi = true;
1596                 fn = obd_dif_crc_fn;
1597                 sector_size = 4096;
1598                 break;
1599         default:
1600                 break;
1601         }
1602
1603         if (t10pi)
1604                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1605                                              aa->aa_page_count,
1606                                              aa->aa_ppga,
1607                                              OST_WRITE,
1608                                              fn,
1609                                              sector_size,
1610                                              &new_cksum);
1611         else
1612                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1613                                        aa->aa_ppga, OST_WRITE, cksum_type,
1614                                        &new_cksum);
1615
1616         if (rc < 0)
1617                 msg = "failed to calculate the client write checksum";
1618         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1619                 msg = "the server did not use the checksum type specified in "
1620                       "the original request - likely a protocol problem";
1621         else if (new_cksum == server_cksum)
1622                 msg = "changed on the client after we checksummed it - "
1623                       "likely false positive due to mmap IO (bug 11742)";
1624         else if (new_cksum == client_cksum)
1625                 msg = "changed in transit before arrival at OST";
1626         else
1627                 msg = "changed in transit AND doesn't match the original - "
1628                       "likely false positive due to mmap IO (bug 11742)";
1629
1630         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1631                            DFID " object "DOSTID" extent [%llu-%llu], original "
1632                            "client csum %x (type %x), server csum %x (type %x),"
1633                            " client csum now %x\n",
1634                            obd_name, msg, libcfs_nid2str(peer->nid),
1635                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1636                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1637                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1638                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1639                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1640                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1641                            client_cksum,
1642                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1643                            server_cksum, cksum_type, new_cksum);
1644         return 1;
1645 }
1646
1647 /* Note rc enters this function as number of bytes transferred */
1648 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1649 {
1650         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1651         struct client_obd *cli = aa->aa_cli;
1652         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1653         const struct lnet_process_id *peer =
1654                 &req->rq_import->imp_connection->c_peer;
1655         struct ost_body *body;
1656         u32 client_cksum = 0;
1657         ENTRY;
1658
1659         if (rc < 0 && rc != -EDQUOT) {
1660                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1661                 RETURN(rc);
1662         }
1663
1664         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1665         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1666         if (body == NULL) {
1667                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1668                 RETURN(-EPROTO);
1669         }
1670
1671         /* set/clear over quota flag for a uid/gid/projid */
1672         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1673             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1674                 unsigned qid[LL_MAXQUOTAS] = {
1675                                          body->oa.o_uid, body->oa.o_gid,
1676                                          body->oa.o_projid };
1677                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1678                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1679                        body->oa.o_valid, body->oa.o_flags);
1680                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1681                                        body->oa.o_flags);
1682         }
1683
1684         osc_update_grant(cli, body);
1685
1686         if (rc < 0)
1687                 RETURN(rc);
1688
1689         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1690                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1691
1692         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1693                 if (rc > 0) {
1694                         CERROR("Unexpected +ve rc %d\n", rc);
1695                         RETURN(-EPROTO);
1696                 }
1697
1698                 if (req->rq_bulk != NULL &&
1699                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1700                         RETURN(-EAGAIN);
1701
1702                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1703                     check_write_checksum(&body->oa, peer, client_cksum,
1704                                          body->oa.o_cksum, aa))
1705                         RETURN(-EAGAIN);
1706
1707                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1708                                      aa->aa_page_count, aa->aa_ppga);
1709                 GOTO(out, rc);
1710         }
1711
1712         /* The rest of this function executes only for OST_READs */
1713
1714         if (req->rq_bulk == NULL) {
1715                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1716                                           RCL_SERVER);
1717                 LASSERT(rc == req->rq_status);
1718         } else {
1719                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1720                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1721         }
1722         if (rc < 0)
1723                 GOTO(out, rc = -EAGAIN);
1724
1725         if (rc > aa->aa_requested_nob) {
1726                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1727                        aa->aa_requested_nob);
1728                 RETURN(-EPROTO);
1729         }
1730
1731         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1732                 CERROR ("Unexpected rc %d (%d transferred)\n",
1733                         rc, req->rq_bulk->bd_nob_transferred);
1734                 return (-EPROTO);
1735         }
1736
1737         if (req->rq_bulk == NULL) {
1738                 /* short io */
1739                 int nob, pg_count, i = 0;
1740                 unsigned char *buf;
1741
1742                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1743                 pg_count = aa->aa_page_count;
1744                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1745                                                    rc);
1746                 nob = rc;
1747                 while (nob > 0 && pg_count > 0) {
1748                         unsigned char *ptr;
1749                         int count = aa->aa_ppga[i]->count > nob ?
1750                                     nob : aa->aa_ppga[i]->count;
1751
1752                         CDEBUG(D_CACHE, "page %p count %d\n",
1753                                aa->aa_ppga[i]->pg, count);
1754                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1755                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1756                                count);
1757                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1758
1759                         buf += count;
1760                         nob -= count;
1761                         i++;
1762                         pg_count--;
1763                 }
1764         }
1765
1766         if (rc < aa->aa_requested_nob)
1767                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1768
1769         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1770                 static int cksum_counter;
1771                 u32        server_cksum = body->oa.o_cksum;
1772                 char      *via = "";
1773                 char      *router = "";
1774                 enum cksum_types cksum_type;
1775                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1776                         body->oa.o_flags : 0;
1777
1778                 cksum_type = obd_cksum_type_unpack(o_flags);
1779                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1780                                           aa->aa_page_count, aa->aa_ppga,
1781                                           OST_READ, &client_cksum);
1782                 if (rc < 0)
1783                         GOTO(out, rc);
1784
1785                 if (req->rq_bulk != NULL &&
1786                     peer->nid != req->rq_bulk->bd_sender) {
1787                         via = " via ";
1788                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1789                 }
1790
1791                 if (server_cksum != client_cksum) {
1792                         struct ost_body *clbody;
1793                         u32 page_count = aa->aa_page_count;
1794
1795                         clbody = req_capsule_client_get(&req->rq_pill,
1796                                                         &RMF_OST_BODY);
1797                         if (cli->cl_checksum_dump)
1798                                 dump_all_bulk_pages(&clbody->oa, page_count,
1799                                                     aa->aa_ppga, server_cksum,
1800                                                     client_cksum);
1801
1802                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1803                                            "%s%s%s inode "DFID" object "DOSTID
1804                                            " extent [%llu-%llu], client %x, "
1805                                            "server %x, cksum_type %x\n",
1806                                            obd_name,
1807                                            libcfs_nid2str(peer->nid),
1808                                            via, router,
1809                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1810                                                 clbody->oa.o_parent_seq : 0ULL,
1811                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1812                                                 clbody->oa.o_parent_oid : 0,
1813                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1814                                                 clbody->oa.o_parent_ver : 0,
1815                                            POSTID(&body->oa.o_oi),
1816                                            aa->aa_ppga[0]->off,
1817                                            aa->aa_ppga[page_count-1]->off +
1818                                            aa->aa_ppga[page_count-1]->count - 1,
1819                                            client_cksum, server_cksum,
1820                                            cksum_type);
1821                         cksum_counter = 0;
1822                         aa->aa_oa->o_cksum = client_cksum;
1823                         rc = -EAGAIN;
1824                 } else {
1825                         cksum_counter++;
1826                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1827                         rc = 0;
1828                 }
1829         } else if (unlikely(client_cksum)) {
1830                 static int cksum_missed;
1831
1832                 cksum_missed++;
1833                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1834                         CERROR("Checksum %u requested from %s but not sent\n",
1835                                cksum_missed, libcfs_nid2str(peer->nid));
1836         } else {
1837                 rc = 0;
1838         }
1839 out:
1840         if (rc >= 0)
1841                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1842                                      aa->aa_oa, &body->oa);
1843
1844         RETURN(rc);
1845 }
1846
1847 static int osc_brw_redo_request(struct ptlrpc_request *request,
1848                                 struct osc_brw_async_args *aa, int rc)
1849 {
1850         struct ptlrpc_request *new_req;
1851         struct osc_brw_async_args *new_aa;
1852         struct osc_async_page *oap;
1853         ENTRY;
1854
1855         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1856                   "redo for recoverable error %d", rc);
1857
1858         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1859                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1860                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1861                                   aa->aa_ppga, &new_req, 1);
1862         if (rc)
1863                 RETURN(rc);
1864
1865         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1866                 if (oap->oap_request != NULL) {
1867                         LASSERTF(request == oap->oap_request,
1868                                  "request %p != oap_request %p\n",
1869                                  request, oap->oap_request);
1870                         if (oap->oap_interrupted) {
1871                                 ptlrpc_req_finished(new_req);
1872                                 RETURN(-EINTR);
1873                         }
1874                 }
1875         }
1876         /* New request takes over pga and oaps from old request.
1877          * Note that copying a list_head doesn't work, need to move it... */
1878         aa->aa_resends++;
1879         new_req->rq_interpret_reply = request->rq_interpret_reply;
1880         new_req->rq_async_args = request->rq_async_args;
1881         new_req->rq_commit_cb = request->rq_commit_cb;
1882         /* cap resend delay to the current request timeout, this is similar to
1883          * what ptlrpc does (see after_reply()) */
1884         if (aa->aa_resends > new_req->rq_timeout)
1885                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1886         else
1887                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1888         new_req->rq_generation_set = 1;
1889         new_req->rq_import_generation = request->rq_import_generation;
1890
1891         new_aa = ptlrpc_req_async_args(new_req);
1892
1893         INIT_LIST_HEAD(&new_aa->aa_oaps);
1894         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1895         INIT_LIST_HEAD(&new_aa->aa_exts);
1896         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1897         new_aa->aa_resends = aa->aa_resends;
1898
1899         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1900                 if (oap->oap_request) {
1901                         ptlrpc_req_finished(oap->oap_request);
1902                         oap->oap_request = ptlrpc_request_addref(new_req);
1903                 }
1904         }
1905
1906         /* XXX: This code will run into problem if we're going to support
1907          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1908          * and wait for all of them to be finished. We should inherit request
1909          * set from old request. */
1910         ptlrpcd_add_req(new_req);
1911
1912         DEBUG_REQ(D_INFO, new_req, "new request");
1913         RETURN(0);
1914 }
1915
1916 /*
1917  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1918  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1919  * fine for our small page arrays and doesn't require allocation.  its an
1920  * insertion sort that swaps elements that are strides apart, shrinking the
1921  * stride down until its '1' and the array is sorted.
1922  */
1923 static void sort_brw_pages(struct brw_page **array, int num)
1924 {
1925         int stride, i, j;
1926         struct brw_page *tmp;
1927
1928         if (num == 1)
1929                 return;
1930         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1931                 ;
1932
1933         do {
1934                 stride /= 3;
1935                 for (i = stride ; i < num ; i++) {
1936                         tmp = array[i];
1937                         j = i;
1938                         while (j >= stride && array[j - stride]->off > tmp->off) {
1939                                 array[j] = array[j - stride];
1940                                 j -= stride;
1941                         }
1942                         array[j] = tmp;
1943                 }
1944         } while (stride > 1);
1945 }
1946
1947 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1948 {
1949         LASSERT(ppga != NULL);
1950         OBD_FREE(ppga, sizeof(*ppga) * count);
1951 }
1952
1953 static int brw_interpret(const struct lu_env *env,
1954                          struct ptlrpc_request *req, void *data, int rc)
1955 {
1956         struct osc_brw_async_args *aa = data;
1957         struct osc_extent *ext;
1958         struct osc_extent *tmp;
1959         struct client_obd *cli = aa->aa_cli;
1960         unsigned long           transferred = 0;
1961         ENTRY;
1962
1963         rc = osc_brw_fini_request(req, rc);
1964         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1965         /* When server return -EINPROGRESS, client should always retry
1966          * regardless of the number of times the bulk was resent already. */
1967         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1968                 if (req->rq_import_generation !=
1969                     req->rq_import->imp_generation) {
1970                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1971                                ""DOSTID", rc = %d.\n",
1972                                req->rq_import->imp_obd->obd_name,
1973                                POSTID(&aa->aa_oa->o_oi), rc);
1974                 } else if (rc == -EINPROGRESS ||
1975                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1976                         rc = osc_brw_redo_request(req, aa, rc);
1977                 } else {
1978                         CERROR("%s: too many resent retries for object: "
1979                                "%llu:%llu, rc = %d.\n",
1980                                req->rq_import->imp_obd->obd_name,
1981                                POSTID(&aa->aa_oa->o_oi), rc);
1982                 }
1983
1984                 if (rc == 0)
1985                         RETURN(0);
1986                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1987                         rc = -EIO;
1988         }
1989
1990         if (rc == 0) {
1991                 struct obdo *oa = aa->aa_oa;
1992                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1993                 unsigned long valid = 0;
1994                 struct cl_object *obj;
1995                 struct osc_async_page *last;
1996
1997                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1998                 obj = osc2cl(last->oap_obj);
1999
2000                 cl_object_attr_lock(obj);
2001                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002                         attr->cat_blocks = oa->o_blocks;
2003                         valid |= CAT_BLOCKS;
2004                 }
2005                 if (oa->o_valid & OBD_MD_FLMTIME) {
2006                         attr->cat_mtime = oa->o_mtime;
2007                         valid |= CAT_MTIME;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLATIME) {
2010                         attr->cat_atime = oa->o_atime;
2011                         valid |= CAT_ATIME;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLCTIME) {
2014                         attr->cat_ctime = oa->o_ctime;
2015                         valid |= CAT_CTIME;
2016                 }
2017
2018                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2019                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2020                         loff_t last_off = last->oap_count + last->oap_obj_off +
2021                                 last->oap_page_off;
2022
2023                         /* Change file size if this is an out of quota or
2024                          * direct IO write and it extends the file size */
2025                         if (loi->loi_lvb.lvb_size < last_off) {
2026                                 attr->cat_size = last_off;
2027                                 valid |= CAT_SIZE;
2028                         }
2029                         /* Extend KMS if it's not a lockless write */
2030                         if (loi->loi_kms < last_off &&
2031                             oap2osc_page(last)->ops_srvlock == 0) {
2032                                 attr->cat_kms = last_off;
2033                                 valid |= CAT_KMS;
2034                         }
2035                 }
2036
2037                 if (valid != 0)
2038                         cl_object_attr_update(env, obj, attr, valid);
2039                 cl_object_attr_unlock(obj);
2040         }
2041         OBDO_FREE(aa->aa_oa);
2042
2043         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2044                 osc_inc_unstable_pages(req);
2045
2046         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2047                 list_del_init(&ext->oe_link);
2048                 osc_extent_finish(env, ext, 1,
2049                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2050         }
2051         LASSERT(list_empty(&aa->aa_exts));
2052         LASSERT(list_empty(&aa->aa_oaps));
2053
2054         transferred = (req->rq_bulk == NULL ? /* short io */
2055                        aa->aa_requested_nob :
2056                        req->rq_bulk->bd_nob_transferred);
2057
2058         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2059         ptlrpc_lprocfs_brw(req, transferred);
2060
2061         spin_lock(&cli->cl_loi_list_lock);
2062         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2063          * is called so we know whether to go to sync BRWs or wait for more
2064          * RPCs to complete */
2065         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2066                 cli->cl_w_in_flight--;
2067         else
2068                 cli->cl_r_in_flight--;
2069         osc_wake_cache_waiters(cli);
2070         spin_unlock(&cli->cl_loi_list_lock);
2071
2072         osc_io_unplug(env, cli, NULL);
2073         RETURN(rc);
2074 }
2075
2076 static void brw_commit(struct ptlrpc_request *req)
2077 {
2078         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2079          * this called via the rq_commit_cb, I need to ensure
2080          * osc_dec_unstable_pages is still called. Otherwise unstable
2081          * pages may be leaked. */
2082         spin_lock(&req->rq_lock);
2083         if (likely(req->rq_unstable)) {
2084                 req->rq_unstable = 0;
2085                 spin_unlock(&req->rq_lock);
2086
2087                 osc_dec_unstable_pages(req);
2088         } else {
2089                 req->rq_committed = 1;
2090                 spin_unlock(&req->rq_lock);
2091         }
2092 }
2093
2094 /**
2095  * Build an RPC by the list of extent @ext_list. The caller must ensure
2096  * that the total pages in this list are NOT over max pages per RPC.
2097  * Extents in the list must be in OES_RPC state.
2098  */
2099 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2100                   struct list_head *ext_list, int cmd)
2101 {
2102         struct ptlrpc_request           *req = NULL;
2103         struct osc_extent               *ext;
2104         struct brw_page                 **pga = NULL;
2105         struct osc_brw_async_args       *aa = NULL;
2106         struct obdo                     *oa = NULL;
2107         struct osc_async_page           *oap;
2108         struct osc_object               *obj = NULL;
2109         struct cl_req_attr              *crattr = NULL;
2110         loff_t                          starting_offset = OBD_OBJECT_EOF;
2111         loff_t                          ending_offset = 0;
2112         int                             mpflag = 0;
2113         int                             mem_tight = 0;
2114         int                             page_count = 0;
2115         bool                            soft_sync = false;
2116         bool                            interrupted = false;
2117         bool                            ndelay = false;
2118         int                             i;
2119         int                             grant = 0;
2120         int                             rc;
2121         __u32                           layout_version = 0;
2122         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2123         struct ost_body                 *body;
2124         ENTRY;
2125         LASSERT(!list_empty(ext_list));
2126
2127         /* add pages into rpc_list to build BRW rpc */
2128         list_for_each_entry(ext, ext_list, oe_link) {
2129                 LASSERT(ext->oe_state == OES_RPC);
2130                 mem_tight |= ext->oe_memalloc;
2131                 grant += ext->oe_grants;
2132                 page_count += ext->oe_nr_pages;
2133                 layout_version = MAX(layout_version, ext->oe_layout_version);
2134                 if (obj == NULL)
2135                         obj = ext->oe_obj;
2136         }
2137
2138         soft_sync = osc_over_unstable_soft_limit(cli);
2139         if (mem_tight)
2140                 mpflag = cfs_memory_pressure_get_and_set();
2141
2142         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2143         if (pga == NULL)
2144                 GOTO(out, rc = -ENOMEM);
2145
2146         OBDO_ALLOC(oa);
2147         if (oa == NULL)
2148                 GOTO(out, rc = -ENOMEM);
2149
2150         i = 0;
2151         list_for_each_entry(ext, ext_list, oe_link) {
2152                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2153                         if (mem_tight)
2154                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2155                         if (soft_sync)
2156                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2157                         pga[i] = &oap->oap_brw_page;
2158                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2159                         i++;
2160
2161                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2162                         if (starting_offset == OBD_OBJECT_EOF ||
2163                             starting_offset > oap->oap_obj_off)
2164                                 starting_offset = oap->oap_obj_off;
2165                         else
2166                                 LASSERT(oap->oap_page_off == 0);
2167                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2168                                 ending_offset = oap->oap_obj_off +
2169                                                 oap->oap_count;
2170                         else
2171                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2172                                         PAGE_SIZE);
2173                         if (oap->oap_interrupted)
2174                                 interrupted = true;
2175                 }
2176                 if (ext->oe_ndelay)
2177                         ndelay = true;
2178         }
2179
2180         /* first page in the list */
2181         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2182
2183         crattr = &osc_env_info(env)->oti_req_attr;
2184         memset(crattr, 0, sizeof(*crattr));
2185         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2186         crattr->cra_flags = ~0ULL;
2187         crattr->cra_page = oap2cl_page(oap);
2188         crattr->cra_oa = oa;
2189         cl_req_attr_set(env, osc2cl(obj), crattr);
2190
2191         if (cmd == OBD_BRW_WRITE) {
2192                 oa->o_grant_used = grant;
2193                 if (layout_version > 0) {
2194                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2195                                PFID(&oa->o_oi.oi_fid), layout_version);
2196
2197                         oa->o_layout_version = layout_version;
2198                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2199                 }
2200         }
2201
2202         sort_brw_pages(pga, page_count);
2203         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2204         if (rc != 0) {
2205                 CERROR("prep_req failed: %d\n", rc);
2206                 GOTO(out, rc);
2207         }
2208
2209         req->rq_commit_cb = brw_commit;
2210         req->rq_interpret_reply = brw_interpret;
2211         req->rq_memalloc = mem_tight != 0;
2212         oap->oap_request = ptlrpc_request_addref(req);
2213         if (interrupted && !req->rq_intr)
2214                 ptlrpc_mark_interrupted(req);
2215         if (ndelay) {
2216                 req->rq_no_resend = req->rq_no_delay = 1;
2217                 /* probably set a shorter timeout value.
2218                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2219                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2220         }
2221
2222         /* Need to update the timestamps after the request is built in case
2223          * we race with setattr (locally or in queue at OST).  If OST gets
2224          * later setattr before earlier BRW (as determined by the request xid),
2225          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2226          * way to do this in a single call.  bug 10150 */
2227         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2228         crattr->cra_oa = &body->oa;
2229         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2230         cl_req_attr_set(env, osc2cl(obj), crattr);
2231         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2232
2233         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2234         aa = ptlrpc_req_async_args(req);
2235         INIT_LIST_HEAD(&aa->aa_oaps);
2236         list_splice_init(&rpc_list, &aa->aa_oaps);
2237         INIT_LIST_HEAD(&aa->aa_exts);
2238         list_splice_init(ext_list, &aa->aa_exts);
2239
2240         spin_lock(&cli->cl_loi_list_lock);
2241         starting_offset >>= PAGE_SHIFT;
2242         if (cmd == OBD_BRW_READ) {
2243                 cli->cl_r_in_flight++;
2244                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2245                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2246                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2247                                       starting_offset + 1);
2248         } else {
2249                 cli->cl_w_in_flight++;
2250                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2251                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2252                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2253                                       starting_offset + 1);
2254         }
2255         spin_unlock(&cli->cl_loi_list_lock);
2256
2257         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2258                   page_count, aa, cli->cl_r_in_flight,
2259                   cli->cl_w_in_flight);
2260         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2261
2262         ptlrpcd_add_req(req);
2263         rc = 0;
2264         EXIT;
2265
2266 out:
2267         if (mem_tight != 0)
2268                 cfs_memory_pressure_restore(mpflag);
2269
2270         if (rc != 0) {
2271                 LASSERT(req == NULL);
2272
2273                 if (oa)
2274                         OBDO_FREE(oa);
2275                 if (pga)
2276                         OBD_FREE(pga, sizeof(*pga) * page_count);
2277                 /* this should happen rarely and is pretty bad, it makes the
2278                  * pending list not follow the dirty order */
2279                 while (!list_empty(ext_list)) {
2280                         ext = list_entry(ext_list->next, struct osc_extent,
2281                                          oe_link);
2282                         list_del_init(&ext->oe_link);
2283                         osc_extent_finish(env, ext, 0, rc);
2284                 }
2285         }
2286         RETURN(rc);
2287 }
2288
2289 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2290 {
2291         int set = 0;
2292
2293         LASSERT(lock != NULL);
2294
2295         lock_res_and_lock(lock);
2296
2297         if (lock->l_ast_data == NULL)
2298                 lock->l_ast_data = data;
2299         if (lock->l_ast_data == data)
2300                 set = 1;
2301
2302         unlock_res_and_lock(lock);
2303
2304         return set;
2305 }
2306
2307 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2308                      void *cookie, struct lustre_handle *lockh,
2309                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2310                      int errcode)
2311 {
2312         bool intent = *flags & LDLM_FL_HAS_INTENT;
2313         int rc;
2314         ENTRY;
2315
2316         /* The request was created before ldlm_cli_enqueue call. */
2317         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2318                 struct ldlm_reply *rep;
2319
2320                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2321                 LASSERT(rep != NULL);
2322
2323                 rep->lock_policy_res1 =
2324                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2325                 if (rep->lock_policy_res1)
2326                         errcode = rep->lock_policy_res1;
2327                 if (!speculative)
2328                         *flags |= LDLM_FL_LVB_READY;
2329         } else if (errcode == ELDLM_OK) {
2330                 *flags |= LDLM_FL_LVB_READY;
2331         }
2332
2333         /* Call the update callback. */
2334         rc = (*upcall)(cookie, lockh, errcode);
2335
2336         /* release the reference taken in ldlm_cli_enqueue() */
2337         if (errcode == ELDLM_LOCK_MATCHED)
2338                 errcode = ELDLM_OK;
2339         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2340                 ldlm_lock_decref(lockh, mode);
2341
2342         RETURN(rc);
2343 }
2344
2345 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2346                           struct osc_enqueue_args *aa, int rc)
2347 {
2348         struct ldlm_lock *lock;
2349         struct lustre_handle *lockh = &aa->oa_lockh;
2350         enum ldlm_mode mode = aa->oa_mode;
2351         struct ost_lvb *lvb = aa->oa_lvb;
2352         __u32 lvb_len = sizeof(*lvb);
2353         __u64 flags = 0;
2354
2355         ENTRY;
2356
2357         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2358          * be valid. */
2359         lock = ldlm_handle2lock(lockh);
2360         LASSERTF(lock != NULL,
2361                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2362                  lockh->cookie, req, aa);
2363
2364         /* Take an additional reference so that a blocking AST that
2365          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2366          * to arrive after an upcall has been executed by
2367          * osc_enqueue_fini(). */
2368         ldlm_lock_addref(lockh, mode);
2369
2370         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2371         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2372
2373         /* Let CP AST to grant the lock first. */
2374         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2375
2376         if (aa->oa_speculative) {
2377                 LASSERT(aa->oa_lvb == NULL);
2378                 LASSERT(aa->oa_flags == NULL);
2379                 aa->oa_flags = &flags;
2380         }
2381
2382         /* Complete obtaining the lock procedure. */
2383         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2384                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2385                                    lockh, rc);
2386         /* Complete osc stuff. */
2387         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2388                               aa->oa_flags, aa->oa_speculative, rc);
2389
2390         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2391
2392         ldlm_lock_decref(lockh, mode);
2393         LDLM_LOCK_PUT(lock);
2394         RETURN(rc);
2395 }
2396
2397 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2398
2399 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2400  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2401  * other synchronous requests, however keeping some locks and trying to obtain
2402  * others may take a considerable amount of time in a case of ost failure; and
2403  * when other sync requests do not get released lock from a client, the client
2404  * is evicted from the cluster -- such scenarious make the life difficult, so
2405  * release locks just after they are obtained. */
2406 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2407                      __u64 *flags, union ldlm_policy_data *policy,
2408                      struct ost_lvb *lvb, int kms_valid,
2409                      osc_enqueue_upcall_f upcall, void *cookie,
2410                      struct ldlm_enqueue_info *einfo,
2411                      struct ptlrpc_request_set *rqset, int async,
2412                      bool speculative)
2413 {
2414         struct obd_device *obd = exp->exp_obd;
2415         struct lustre_handle lockh = { 0 };
2416         struct ptlrpc_request *req = NULL;
2417         int intent = *flags & LDLM_FL_HAS_INTENT;
2418         __u64 match_flags = *flags;
2419         enum ldlm_mode mode;
2420         int rc;
2421         ENTRY;
2422
2423         /* Filesystem lock extents are extended to page boundaries so that
2424          * dealing with the page cache is a little smoother.  */
2425         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2426         policy->l_extent.end |= ~PAGE_MASK;
2427
2428         /*
2429          * kms is not valid when either object is completely fresh (so that no
2430          * locks are cached), or object was evicted. In the latter case cached
2431          * lock cannot be used, because it would prime inode state with
2432          * potentially stale LVB.
2433          */
2434         if (!kms_valid)
2435                 goto no_match;
2436
2437         /* Next, search for already existing extent locks that will cover us */
2438         /* If we're trying to read, we also search for an existing PW lock.  The
2439          * VFS and page cache already protect us locally, so lots of readers/
2440          * writers can share a single PW lock.
2441          *
2442          * There are problems with conversion deadlocks, so instead of
2443          * converting a read lock to a write lock, we'll just enqueue a new
2444          * one.
2445          *
2446          * At some point we should cancel the read lock instead of making them
2447          * send us a blocking callback, but there are problems with canceling
2448          * locks out from other users right now, too. */
2449         mode = einfo->ei_mode;
2450         if (einfo->ei_mode == LCK_PR)
2451                 mode |= LCK_PW;
2452         /* Normal lock requests must wait for the LVB to be ready before
2453          * matching a lock; speculative lock requests do not need to,
2454          * because they will not actually use the lock. */
2455         if (!speculative)
2456                 match_flags |= LDLM_FL_LVB_READY;
2457         if (intent != 0)
2458                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2459         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2460                                einfo->ei_type, policy, mode, &lockh, 0);
2461         if (mode) {
2462                 struct ldlm_lock *matched;
2463
2464                 if (*flags & LDLM_FL_TEST_LOCK)
2465                         RETURN(ELDLM_OK);
2466
2467                 matched = ldlm_handle2lock(&lockh);
2468                 if (speculative) {
2469                         /* This DLM lock request is speculative, and does not
2470                          * have an associated IO request. Therefore if there
2471                          * is already a DLM lock, it wll just inform the
2472                          * caller to cancel the request for this stripe.*/
2473                         lock_res_and_lock(matched);
2474                         if (ldlm_extent_equal(&policy->l_extent,
2475                             &matched->l_policy_data.l_extent))
2476                                 rc = -EEXIST;
2477                         else
2478                                 rc = -ECANCELED;
2479                         unlock_res_and_lock(matched);
2480
2481                         ldlm_lock_decref(&lockh, mode);
2482                         LDLM_LOCK_PUT(matched);
2483                         RETURN(rc);
2484                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2485                         *flags |= LDLM_FL_LVB_READY;
2486
2487                         /* We already have a lock, and it's referenced. */
2488                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2489
2490                         ldlm_lock_decref(&lockh, mode);
2491                         LDLM_LOCK_PUT(matched);
2492                         RETURN(ELDLM_OK);
2493                 } else {
2494                         ldlm_lock_decref(&lockh, mode);
2495                         LDLM_LOCK_PUT(matched);
2496                 }
2497         }
2498
2499 no_match:
2500         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2501                 RETURN(-ENOLCK);
2502
2503         if (intent) {
2504                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2505                                            &RQF_LDLM_ENQUEUE_LVB);
2506                 if (req == NULL)
2507                         RETURN(-ENOMEM);
2508
2509                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2510                 if (rc) {
2511                         ptlrpc_request_free(req);
2512                         RETURN(rc);
2513                 }
2514
2515                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2516                                      sizeof *lvb);
2517                 ptlrpc_request_set_replen(req);
2518         }
2519
2520         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2521         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2522
2523         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2524                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2525         if (async) {
2526                 if (!rc) {
2527                         struct osc_enqueue_args *aa;
2528                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2529                         aa = ptlrpc_req_async_args(req);
2530                         aa->oa_exp         = exp;
2531                         aa->oa_mode        = einfo->ei_mode;
2532                         aa->oa_type        = einfo->ei_type;
2533                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2534                         aa->oa_upcall      = upcall;
2535                         aa->oa_cookie      = cookie;
2536                         aa->oa_speculative = speculative;
2537                         if (!speculative) {
2538                                 aa->oa_flags  = flags;
2539                                 aa->oa_lvb    = lvb;
2540                         } else {
2541                                 /* speculative locks are essentially to enqueue
2542                                  * a DLM lock  in advance, so we don't care
2543                                  * about the result of the enqueue. */
2544                                 aa->oa_lvb    = NULL;
2545                                 aa->oa_flags  = NULL;
2546                         }
2547
2548                         req->rq_interpret_reply =
2549                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2550                         if (rqset == PTLRPCD_SET)
2551                                 ptlrpcd_add_req(req);
2552                         else
2553                                 ptlrpc_set_add_req(rqset, req);
2554                 } else if (intent) {
2555                         ptlrpc_req_finished(req);
2556                 }
2557                 RETURN(rc);
2558         }
2559
2560         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2561                               flags, speculative, rc);
2562         if (intent)
2563                 ptlrpc_req_finished(req);
2564
2565         RETURN(rc);
2566 }
2567
2568 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2569                    enum ldlm_type type, union ldlm_policy_data *policy,
2570                    enum ldlm_mode mode, __u64 *flags, void *data,
2571                    struct lustre_handle *lockh, int unref)
2572 {
2573         struct obd_device *obd = exp->exp_obd;
2574         __u64 lflags = *flags;
2575         enum ldlm_mode rc;
2576         ENTRY;
2577
2578         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2579                 RETURN(-EIO);
2580
2581         /* Filesystem lock extents are extended to page boundaries so that
2582          * dealing with the page cache is a little smoother */
2583         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2584         policy->l_extent.end |= ~PAGE_MASK;
2585
2586         /* Next, search for already existing extent locks that will cover us */
2587         /* If we're trying to read, we also search for an existing PW lock.  The
2588          * VFS and page cache already protect us locally, so lots of readers/
2589          * writers can share a single PW lock. */
2590         rc = mode;
2591         if (mode == LCK_PR)
2592                 rc |= LCK_PW;
2593         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2594                              res_id, type, policy, rc, lockh, unref);
2595         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2596                 RETURN(rc);
2597
2598         if (data != NULL) {
2599                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2600
2601                 LASSERT(lock != NULL);
2602                 if (!osc_set_lock_data(lock, data)) {
2603                         ldlm_lock_decref(lockh, rc);
2604                         rc = 0;
2605                 }
2606                 LDLM_LOCK_PUT(lock);
2607         }
2608         RETURN(rc);
2609 }
2610
2611 static int osc_statfs_interpret(const struct lu_env *env,
2612                                 struct ptlrpc_request *req,
2613                                 struct osc_async_args *aa, int rc)
2614 {
2615         struct obd_statfs *msfs;
2616         ENTRY;
2617
2618         if (rc == -EBADR)
2619                 /* The request has in fact never been sent
2620                  * due to issues at a higher level (LOV).
2621                  * Exit immediately since the caller is
2622                  * aware of the problem and takes care
2623                  * of the clean up */
2624                  RETURN(rc);
2625
2626         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2627             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2628                 GOTO(out, rc = 0);
2629
2630         if (rc != 0)
2631                 GOTO(out, rc);
2632
2633         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2634         if (msfs == NULL) {
2635                 GOTO(out, rc = -EPROTO);
2636         }
2637
2638         *aa->aa_oi->oi_osfs = *msfs;
2639 out:
2640         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2641         RETURN(rc);
2642 }
2643
2644 static int osc_statfs_async(struct obd_export *exp,
2645                             struct obd_info *oinfo, time64_t max_age,
2646                             struct ptlrpc_request_set *rqset)
2647 {
2648         struct obd_device     *obd = class_exp2obd(exp);
2649         struct ptlrpc_request *req;
2650         struct osc_async_args *aa;
2651         int                    rc;
2652         ENTRY;
2653
2654         /* We could possibly pass max_age in the request (as an absolute
2655          * timestamp or a "seconds.usec ago") so the target can avoid doing
2656          * extra calls into the filesystem if that isn't necessary (e.g.
2657          * during mount that would help a bit).  Having relative timestamps
2658          * is not so great if request processing is slow, while absolute
2659          * timestamps are not ideal because they need time synchronization. */
2660         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2661         if (req == NULL)
2662                 RETURN(-ENOMEM);
2663
2664         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2665         if (rc) {
2666                 ptlrpc_request_free(req);
2667                 RETURN(rc);
2668         }
2669         ptlrpc_request_set_replen(req);
2670         req->rq_request_portal = OST_CREATE_PORTAL;
2671         ptlrpc_at_set_req_timeout(req);
2672
2673         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2674                 /* procfs requests not want stat in wait for avoid deadlock */
2675                 req->rq_no_resend = 1;
2676                 req->rq_no_delay = 1;
2677         }
2678
2679         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2680         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2681         aa = ptlrpc_req_async_args(req);
2682         aa->aa_oi = oinfo;
2683
2684         ptlrpc_set_add_req(rqset, req);
2685         RETURN(0);
2686 }
2687
2688 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2689                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2690 {
2691         struct obd_device     *obd = class_exp2obd(exp);
2692         struct obd_statfs     *msfs;
2693         struct ptlrpc_request *req;
2694         struct obd_import     *imp = NULL;
2695         int rc;
2696         ENTRY;
2697
2698         /*Since the request might also come from lprocfs, so we need
2699          *sync this with client_disconnect_export Bug15684*/
2700         down_read(&obd->u.cli.cl_sem);
2701         if (obd->u.cli.cl_import)
2702                 imp = class_import_get(obd->u.cli.cl_import);
2703         up_read(&obd->u.cli.cl_sem);
2704         if (!imp)
2705                 RETURN(-ENODEV);
2706
2707         /* We could possibly pass max_age in the request (as an absolute
2708          * timestamp or a "seconds.usec ago") so the target can avoid doing
2709          * extra calls into the filesystem if that isn't necessary (e.g.
2710          * during mount that would help a bit).  Having relative timestamps
2711          * is not so great if request processing is slow, while absolute
2712          * timestamps are not ideal because they need time synchronization. */
2713         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2714
2715         class_import_put(imp);
2716
2717         if (req == NULL)
2718                 RETURN(-ENOMEM);
2719
2720         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2721         if (rc) {
2722                 ptlrpc_request_free(req);
2723                 RETURN(rc);
2724         }
2725         ptlrpc_request_set_replen(req);
2726         req->rq_request_portal = OST_CREATE_PORTAL;
2727         ptlrpc_at_set_req_timeout(req);
2728
2729         if (flags & OBD_STATFS_NODELAY) {
2730                 /* procfs requests not want stat in wait for avoid deadlock */
2731                 req->rq_no_resend = 1;
2732                 req->rq_no_delay = 1;
2733         }
2734
2735         rc = ptlrpc_queue_wait(req);
2736         if (rc)
2737                 GOTO(out, rc);
2738
2739         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2740         if (msfs == NULL) {
2741                 GOTO(out, rc = -EPROTO);
2742         }
2743
2744         *osfs = *msfs;
2745
2746         EXIT;
2747  out:
2748         ptlrpc_req_finished(req);
2749         return rc;
2750 }
2751
2752 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2753                          void *karg, void __user *uarg)
2754 {
2755         struct obd_device *obd = exp->exp_obd;
2756         struct obd_ioctl_data *data = karg;
2757         int err = 0;
2758         ENTRY;
2759
2760         if (!try_module_get(THIS_MODULE)) {
2761                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2762                        module_name(THIS_MODULE));
2763                 return -EINVAL;
2764         }
2765         switch (cmd) {
2766         case OBD_IOC_CLIENT_RECOVER:
2767                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2768                                             data->ioc_inlbuf1, 0);
2769                 if (err > 0)
2770                         err = 0;
2771                 GOTO(out, err);
2772         case IOC_OSC_SET_ACTIVE:
2773                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2774                                                data->ioc_offset);
2775                 GOTO(out, err);
2776         case OBD_IOC_PING_TARGET:
2777                 err = ptlrpc_obd_ping(obd);
2778                 GOTO(out, err);
2779         default:
2780                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2781                        cmd, current_comm());
2782                 GOTO(out, err = -ENOTTY);
2783         }
2784 out:
2785         module_put(THIS_MODULE);
2786         return err;
2787 }
2788
2789 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2790                        u32 keylen, void *key, u32 vallen, void *val,
2791                        struct ptlrpc_request_set *set)
2792 {
2793         struct ptlrpc_request *req;
2794         struct obd_device     *obd = exp->exp_obd;
2795         struct obd_import     *imp = class_exp2cliimp(exp);
2796         char                  *tmp;
2797         int                    rc;
2798         ENTRY;
2799
2800         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2801
2802         if (KEY_IS(KEY_CHECKSUM)) {
2803                 if (vallen != sizeof(int))
2804                         RETURN(-EINVAL);
2805                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2806                 RETURN(0);
2807         }
2808
2809         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2810                 sptlrpc_conf_client_adapt(obd);
2811                 RETURN(0);
2812         }
2813
2814         if (KEY_IS(KEY_FLUSH_CTX)) {
2815                 sptlrpc_import_flush_my_ctx(imp);
2816                 RETURN(0);
2817         }
2818
2819         if (KEY_IS(KEY_CACHE_SET)) {
2820                 struct client_obd *cli = &obd->u.cli;
2821
2822                 LASSERT(cli->cl_cache == NULL); /* only once */
2823                 cli->cl_cache = (struct cl_client_cache *)val;
2824                 cl_cache_incref(cli->cl_cache);
2825                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2826
2827                 /* add this osc into entity list */
2828                 LASSERT(list_empty(&cli->cl_lru_osc));
2829                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2830                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2831                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2832
2833                 RETURN(0);
2834         }
2835
2836         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2837                 struct client_obd *cli = &obd->u.cli;
2838                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2839                 long target = *(long *)val;
2840
2841                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2842                 *(long *)val -= nr;
2843                 RETURN(0);
2844         }
2845
2846         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2847                 RETURN(-EINVAL);
2848
2849         /* We pass all other commands directly to OST. Since nobody calls osc
2850            methods directly and everybody is supposed to go through LOV, we
2851            assume lov checked invalid values for us.
2852            The only recognised values so far are evict_by_nid and mds_conn.
2853            Even if something bad goes through, we'd get a -EINVAL from OST
2854            anyway. */
2855
2856         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2857                                                 &RQF_OST_SET_GRANT_INFO :
2858                                                 &RQF_OBD_SET_INFO);
2859         if (req == NULL)
2860                 RETURN(-ENOMEM);
2861
2862         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2863                              RCL_CLIENT, keylen);
2864         if (!KEY_IS(KEY_GRANT_SHRINK))
2865                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2866                                      RCL_CLIENT, vallen);
2867         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2868         if (rc) {
2869                 ptlrpc_request_free(req);
2870                 RETURN(rc);
2871         }
2872
2873         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2874         memcpy(tmp, key, keylen);
2875         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2876                                                         &RMF_OST_BODY :
2877                                                         &RMF_SETINFO_VAL);
2878         memcpy(tmp, val, vallen);
2879
2880         if (KEY_IS(KEY_GRANT_SHRINK)) {
2881                 struct osc_grant_args *aa;
2882                 struct obdo *oa;
2883
2884                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2885                 aa = ptlrpc_req_async_args(req);
2886                 OBDO_ALLOC(oa);
2887                 if (!oa) {
2888                         ptlrpc_req_finished(req);
2889                         RETURN(-ENOMEM);
2890                 }
2891                 *oa = ((struct ost_body *)val)->oa;
2892                 aa->aa_oa = oa;
2893                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2894         }
2895
2896         ptlrpc_request_set_replen(req);
2897         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2898                 LASSERT(set != NULL);
2899                 ptlrpc_set_add_req(set, req);
2900                 ptlrpc_check_set(NULL, set);
2901         } else {
2902                 ptlrpcd_add_req(req);
2903         }
2904
2905         RETURN(0);
2906 }
2907 EXPORT_SYMBOL(osc_set_info_async);
2908
2909 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2910                   struct obd_device *obd, struct obd_uuid *cluuid,
2911                   struct obd_connect_data *data, void *localdata)
2912 {
2913         struct client_obd *cli = &obd->u.cli;
2914
2915         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2916                 long lost_grant;
2917                 long grant;
2918
2919                 spin_lock(&cli->cl_loi_list_lock);
2920                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2921                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2922                         grant += cli->cl_dirty_grant;
2923                 else
2924                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2925                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2926                 lost_grant = cli->cl_lost_grant;
2927                 cli->cl_lost_grant = 0;
2928                 spin_unlock(&cli->cl_loi_list_lock);
2929
2930                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2931                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2932                        data->ocd_version, data->ocd_grant, lost_grant);
2933         }
2934
2935         RETURN(0);
2936 }
2937 EXPORT_SYMBOL(osc_reconnect);
2938
2939 int osc_disconnect(struct obd_export *exp)
2940 {
2941         struct obd_device *obd = class_exp2obd(exp);
2942         int rc;
2943
2944         rc = client_disconnect_export(exp);
2945         /**
2946          * Initially we put del_shrink_grant before disconnect_export, but it
2947          * causes the following problem if setup (connect) and cleanup
2948          * (disconnect) are tangled together.
2949          *      connect p1                     disconnect p2
2950          *   ptlrpc_connect_import
2951          *     ...............               class_manual_cleanup
2952          *                                     osc_disconnect
2953          *                                     del_shrink_grant
2954          *   ptlrpc_connect_interrupt
2955          *     init_grant_shrink
2956          *   add this client to shrink list
2957          *                                      cleanup_osc
2958          * Bang! pinger trigger the shrink.
2959          * So the osc should be disconnected from the shrink list, after we
2960          * are sure the import has been destroyed. BUG18662
2961          */
2962         if (obd->u.cli.cl_import == NULL)
2963                 osc_del_shrink_grant(&obd->u.cli);
2964         return rc;
2965 }
2966 EXPORT_SYMBOL(osc_disconnect);
2967
2968 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2969                                  struct hlist_node *hnode, void *arg)
2970 {
2971         struct lu_env *env = arg;
2972         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2973         struct ldlm_lock *lock;
2974         struct osc_object *osc = NULL;
2975         ENTRY;
2976
2977         lock_res(res);
2978         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2979                 if (lock->l_ast_data != NULL && osc == NULL) {
2980                         osc = lock->l_ast_data;
2981                         cl_object_get(osc2cl(osc));
2982                 }
2983
2984                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2985                  * by the 2nd round of ldlm_namespace_clean() call in
2986                  * osc_import_event(). */
2987                 ldlm_clear_cleaned(lock);
2988         }
2989         unlock_res(res);
2990
2991         if (osc != NULL) {
2992                 osc_object_invalidate(env, osc);
2993                 cl_object_put(env, osc2cl(osc));
2994         }
2995
2996         RETURN(0);
2997 }
2998 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2999
3000 static int osc_import_event(struct obd_device *obd,
3001                             struct obd_import *imp,
3002                             enum obd_import_event event)
3003 {
3004         struct client_obd *cli;
3005         int rc = 0;
3006
3007         ENTRY;
3008         LASSERT(imp->imp_obd == obd);
3009
3010         switch (event) {
3011         case IMP_EVENT_DISCON: {
3012                 cli = &obd->u.cli;
3013                 spin_lock(&cli->cl_loi_list_lock);
3014                 cli->cl_avail_grant = 0;
3015                 cli->cl_lost_grant = 0;
3016                 spin_unlock(&cli->cl_loi_list_lock);
3017                 break;
3018         }
3019         case IMP_EVENT_INACTIVE: {
3020                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3021                 break;
3022         }
3023         case IMP_EVENT_INVALIDATE: {
3024                 struct ldlm_namespace *ns = obd->obd_namespace;
3025                 struct lu_env         *env;
3026                 __u16                  refcheck;
3027
3028                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3029
3030                 env = cl_env_get(&refcheck);
3031                 if (!IS_ERR(env)) {
3032                         osc_io_unplug(env, &obd->u.cli, NULL);
3033
3034                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3035                                                  osc_ldlm_resource_invalidate,
3036                                                  env, 0);
3037                         cl_env_put(env, &refcheck);
3038
3039                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3040                 } else
3041                         rc = PTR_ERR(env);
3042                 break;
3043         }
3044         case IMP_EVENT_ACTIVE: {
3045                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3046                 break;
3047         }
3048         case IMP_EVENT_OCD: {
3049                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3050
3051                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3052                         osc_init_grant(&obd->u.cli, ocd);
3053
3054                 /* See bug 7198 */
3055                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3056                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3057
3058                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3059                 break;
3060         }
3061         case IMP_EVENT_DEACTIVATE: {
3062                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3063                 break;
3064         }
3065         case IMP_EVENT_ACTIVATE: {
3066                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3067                 break;
3068         }
3069         default:
3070                 CERROR("Unknown import event %d\n", event);
3071                 LBUG();
3072         }
3073         RETURN(rc);
3074 }
3075
3076 /**
3077  * Determine whether the lock can be canceled before replaying the lock
3078  * during recovery, see bug16774 for detailed information.
3079  *
3080  * \retval zero the lock can't be canceled
3081  * \retval other ok to cancel
3082  */
3083 static int osc_cancel_weight(struct ldlm_lock *lock)
3084 {
3085         /*
3086          * Cancel all unused and granted extent lock.
3087          */
3088         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3089             lock->l_granted_mode == lock->l_req_mode &&
3090             osc_ldlm_weigh_ast(lock) == 0)
3091                 RETURN(1);
3092
3093         RETURN(0);
3094 }
3095
3096 static int brw_queue_work(const struct lu_env *env, void *data)
3097 {
3098         struct client_obd *cli = data;
3099
3100         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3101
3102         osc_io_unplug(env, cli, NULL);
3103         RETURN(0);
3104 }
3105
3106 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3107 {
3108         struct client_obd *cli = &obd->u.cli;
3109         void *handler;
3110         int rc;
3111
3112         ENTRY;
3113
3114         rc = ptlrpcd_addref();
3115         if (rc)
3116                 RETURN(rc);
3117
3118         rc = client_obd_setup(obd, lcfg);
3119         if (rc)
3120                 GOTO(out_ptlrpcd, rc);
3121
3122
3123         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3124         if (IS_ERR(handler))
3125                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3126         cli->cl_writeback_work = handler;
3127
3128         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3129         if (IS_ERR(handler))
3130                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3131         cli->cl_lru_work = handler;
3132
3133         rc = osc_quota_setup(obd);
3134         if (rc)
3135                 GOTO(out_ptlrpcd_work, rc);
3136
3137         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3138
3139         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3140         RETURN(rc);
3141
3142 out_ptlrpcd_work:
3143         if (cli->cl_writeback_work != NULL) {
3144                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3145                 cli->cl_writeback_work = NULL;
3146         }
3147         if (cli->cl_lru_work != NULL) {
3148                 ptlrpcd_destroy_work(cli->cl_lru_work);
3149                 cli->cl_lru_work = NULL;
3150         }
3151         client_obd_cleanup(obd);
3152 out_ptlrpcd:
3153         ptlrpcd_decref();
3154         RETURN(rc);
3155 }
3156 EXPORT_SYMBOL(osc_setup_common);
3157
3158 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3159 {
3160         struct client_obd *cli = &obd->u.cli;
3161         int                adding;
3162         int                added;
3163         int                req_count;
3164         int                rc;
3165
3166         ENTRY;
3167
3168         rc = osc_setup_common(obd, lcfg);
3169         if (rc < 0)
3170                 RETURN(rc);
3171
3172         rc = osc_tunables_init(obd);
3173         if (rc)
3174                 RETURN(rc);
3175
3176         /*
3177          * We try to control the total number of requests with a upper limit
3178          * osc_reqpool_maxreqcount. There might be some race which will cause
3179          * over-limit allocation, but it is fine.
3180          */
3181         req_count = atomic_read(&osc_pool_req_count);
3182         if (req_count < osc_reqpool_maxreqcount) {
3183                 adding = cli->cl_max_rpcs_in_flight + 2;
3184                 if (req_count + adding > osc_reqpool_maxreqcount)
3185                         adding = osc_reqpool_maxreqcount - req_count;
3186
3187                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3188                 atomic_add(added, &osc_pool_req_count);
3189         }
3190
3191         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3192         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3193
3194         spin_lock(&osc_shrink_lock);
3195         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3196         spin_unlock(&osc_shrink_lock);
3197
3198         RETURN(0);
3199 }
3200
3201 int osc_precleanup_common(struct obd_device *obd)
3202 {
3203         struct client_obd *cli = &obd->u.cli;
3204         ENTRY;
3205
3206         /* LU-464
3207          * for echo client, export may be on zombie list, wait for
3208          * zombie thread to cull it, because cli.cl_import will be
3209          * cleared in client_disconnect_export():
3210          *   class_export_destroy() -> obd_cleanup() ->
3211          *   echo_device_free() -> echo_client_cleanup() ->
3212          *   obd_disconnect() -> osc_disconnect() ->
3213          *   client_disconnect_export()
3214          */
3215         obd_zombie_barrier();
3216         if (cli->cl_writeback_work) {
3217                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3218                 cli->cl_writeback_work = NULL;
3219         }
3220
3221         if (cli->cl_lru_work) {
3222                 ptlrpcd_destroy_work(cli->cl_lru_work);
3223                 cli->cl_lru_work = NULL;
3224         }
3225
3226         obd_cleanup_client_import(obd);
3227         RETURN(0);
3228 }
3229 EXPORT_SYMBOL(osc_precleanup_common);
3230
3231 static int osc_precleanup(struct obd_device *obd)
3232 {
3233         ENTRY;
3234
3235         osc_precleanup_common(obd);
3236
3237         ptlrpc_lprocfs_unregister_obd(obd);
3238         RETURN(0);
3239 }
3240
3241 int osc_cleanup_common(struct obd_device *obd)
3242 {
3243         struct client_obd *cli = &obd->u.cli;
3244         int rc;
3245
3246         ENTRY;
3247
3248         spin_lock(&osc_shrink_lock);
3249         list_del(&cli->cl_shrink_list);
3250         spin_unlock(&osc_shrink_lock);
3251
3252         /* lru cleanup */
3253         if (cli->cl_cache != NULL) {
3254                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3255                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3256                 list_del_init(&cli->cl_lru_osc);
3257                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3258                 cli->cl_lru_left = NULL;
3259                 cl_cache_decref(cli->cl_cache);
3260                 cli->cl_cache = NULL;
3261         }
3262
3263         /* free memory of osc quota cache */
3264         osc_quota_cleanup(obd);
3265
3266         rc = client_obd_cleanup(obd);
3267
3268         ptlrpcd_decref();
3269         RETURN(rc);
3270 }
3271 EXPORT_SYMBOL(osc_cleanup_common);
3272
3273 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3274 {
3275         ssize_t count  = class_modify_config(lcfg, PARAM_OSC,
3276                                              &obd->obd_kset.kobj);
3277         return count > 0 ? 0 : count;
3278 }
3279
3280 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3281 {
3282         return osc_process_config_base(obd, buf);
3283 }
3284
3285 static struct obd_ops osc_obd_ops = {
3286         .o_owner                = THIS_MODULE,
3287         .o_setup                = osc_setup,
3288         .o_precleanup           = osc_precleanup,
3289         .o_cleanup              = osc_cleanup_common,
3290         .o_add_conn             = client_import_add_conn,
3291         .o_del_conn             = client_import_del_conn,
3292         .o_connect              = client_connect_import,
3293         .o_reconnect            = osc_reconnect,
3294         .o_disconnect           = osc_disconnect,
3295         .o_statfs               = osc_statfs,
3296         .o_statfs_async         = osc_statfs_async,
3297         .o_create               = osc_create,
3298         .o_destroy              = osc_destroy,
3299         .o_getattr              = osc_getattr,
3300         .o_setattr              = osc_setattr,
3301         .o_iocontrol            = osc_iocontrol,
3302         .o_set_info_async       = osc_set_info_async,
3303         .o_import_event         = osc_import_event,
3304         .o_process_config       = osc_process_config,
3305         .o_quotactl             = osc_quotactl,
3306 };
3307
3308 static struct shrinker *osc_cache_shrinker;
3309 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3310 DEFINE_SPINLOCK(osc_shrink_lock);
3311
3312 #ifndef HAVE_SHRINKER_COUNT
3313 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3314 {
3315         struct shrink_control scv = {
3316                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3317                 .gfp_mask   = shrink_param(sc, gfp_mask)
3318         };
3319 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3320         struct shrinker *shrinker = NULL;
3321 #endif
3322
3323         (void)osc_cache_shrink_scan(shrinker, &scv);
3324
3325         return osc_cache_shrink_count(shrinker, &scv);
3326 }
3327 #endif
3328
3329 static int __init osc_init(void)
3330 {
3331         bool enable_proc = true;
3332         struct obd_type *type;
3333         unsigned int reqpool_size;
3334         unsigned int reqsize;
3335         int rc;
3336         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3337                          osc_cache_shrink_count, osc_cache_shrink_scan);
3338         ENTRY;
3339
3340         /* print an address of _any_ initialized kernel symbol from this
3341          * module, to allow debugging with gdb that doesn't support data
3342          * symbols from modules.*/
3343         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3344
3345         rc = lu_kmem_init(osc_caches);
3346         if (rc)
3347                 RETURN(rc);
3348
3349         type = class_search_type(LUSTRE_OSP_NAME);
3350         if (type != NULL && type->typ_procsym != NULL)
3351                 enable_proc = false;
3352
3353         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3354                                  LUSTRE_OSC_NAME, &osc_device_type);
3355         if (rc)
3356                 GOTO(out_kmem, rc);
3357
3358         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3359
3360         /* This is obviously too much memory, only prevent overflow here */
3361         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3362                 GOTO(out_type, rc = -EINVAL);
3363
3364         reqpool_size = osc_reqpool_mem_max << 20;
3365
3366         reqsize = 1;
3367         while (reqsize < OST_IO_MAXREQSIZE)
3368                 reqsize = reqsize << 1;
3369
3370         /*
3371          * We don't enlarge the request count in OSC pool according to
3372          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3373          * tried after normal allocation failed. So a small OSC pool won't
3374          * cause much performance degression in most of cases.
3375          */
3376         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3377
3378         atomic_set(&osc_pool_req_count, 0);
3379         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3380                                           ptlrpc_add_rqs_to_pool);
3381
3382         if (osc_rq_pool != NULL)
3383                 GOTO(out, rc);
3384         rc = -ENOMEM;
3385 out_type:
3386         class_unregister_type(LUSTRE_OSC_NAME);
3387 out_kmem:
3388         lu_kmem_fini(osc_caches);
3389 out:
3390         RETURN(rc);
3391 }
3392
3393 static void __exit osc_exit(void)
3394 {
3395         remove_shrinker(osc_cache_shrinker);
3396         class_unregister_type(LUSTRE_OSC_NAME);
3397         lu_kmem_fini(osc_caches);
3398         ptlrpc_free_rq_pool(osc_rq_pool);
3399 }
3400
3401 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3402 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3403 MODULE_VERSION(LUSTRE_VERSION_STRING);
3404 MODULE_LICENSE("GPL");
3405
3406 module_init(osc_init);
3407 module_exit(osc_exit);