Whamcloud - gitweb
LU-11770 osc: allow build without blk_integrity or crc-t10pi
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <uapi/linux/lustre/lustre_param.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49
50 #include "osc_internal.h"
51
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
55
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
59
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
62
63 #define osc_grant_args osc_brw_async_args
64
65 struct osc_setattr_args {
66         struct obdo             *sa_oa;
67         obd_enqueue_update_f     sa_upcall;
68         void                    *sa_cookie;
69 };
70
71 struct osc_fsync_args {
72         struct osc_object       *fa_obj;
73         struct obdo             *fa_oa;
74         obd_enqueue_update_f    fa_upcall;
75         void                    *fa_cookie;
76 };
77
78 struct osc_ladvise_args {
79         struct obdo             *la_oa;
80         obd_enqueue_update_f     la_upcall;
81         void                    *la_cookie;
82 };
83
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
86                          void *data, int rc);
87
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 {
90         struct ost_body *body;
91
92         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
93         LASSERT(body);
94
95         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
96 }
97
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
99                        struct obdo *oa)
100 {
101         struct ptlrpc_request   *req;
102         struct ost_body         *body;
103         int                      rc;
104
105         ENTRY;
106         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
107         if (req == NULL)
108                 RETURN(-ENOMEM);
109
110         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111         if (rc) {
112                 ptlrpc_request_free(req);
113                 RETURN(rc);
114         }
115
116         osc_pack_req_body(req, oa);
117
118         ptlrpc_request_set_replen(req);
119
120         rc = ptlrpc_queue_wait(req);
121         if (rc)
122                 GOTO(out, rc);
123
124         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125         if (body == NULL)
126                 GOTO(out, rc = -EPROTO);
127
128         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130
131         oa->o_blksize = cli_brw_size(exp->exp_obd);
132         oa->o_valid |= OBD_MD_FLBLKSZ;
133
134         EXIT;
135 out:
136         ptlrpc_req_finished(req);
137
138         return rc;
139 }
140
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
142                        struct obdo *oa)
143 {
144         struct ptlrpc_request   *req;
145         struct ost_body         *body;
146         int                      rc;
147
148         ENTRY;
149         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150
151         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
152         if (req == NULL)
153                 RETURN(-ENOMEM);
154
155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156         if (rc) {
157                 ptlrpc_request_free(req);
158                 RETURN(rc);
159         }
160
161         osc_pack_req_body(req, oa);
162
163         ptlrpc_request_set_replen(req);
164
165         rc = ptlrpc_queue_wait(req);
166         if (rc)
167                 GOTO(out, rc);
168
169         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170         if (body == NULL)
171                 GOTO(out, rc = -EPROTO);
172
173         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
174
175         EXIT;
176 out:
177         ptlrpc_req_finished(req);
178
179         RETURN(rc);
180 }
181
182 static int osc_setattr_interpret(const struct lu_env *env,
183                                  struct ptlrpc_request *req, void *args, int rc)
184 {
185         struct osc_setattr_args *sa = args;
186         struct ost_body *body;
187
188         ENTRY;
189
190         if (rc != 0)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
198                              &body->oa);
199 out:
200         rc = sa->sa_upcall(sa->sa_cookie, rc);
201         RETURN(rc);
202 }
203
204 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
205                       obd_enqueue_update_f upcall, void *cookie,
206                       struct ptlrpc_request_set *rqset)
207 {
208         struct ptlrpc_request   *req;
209         struct osc_setattr_args *sa;
210         int                      rc;
211
212         ENTRY;
213
214         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
215         if (req == NULL)
216                 RETURN(-ENOMEM);
217
218         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219         if (rc) {
220                 ptlrpc_request_free(req);
221                 RETURN(rc);
222         }
223
224         osc_pack_req_body(req, oa);
225
226         ptlrpc_request_set_replen(req);
227
228         /* do mds to ost setattr asynchronously */
229         if (!rqset) {
230                 /* Do not wait for response. */
231                 ptlrpcd_add_req(req);
232         } else {
233                 req->rq_interpret_reply = osc_setattr_interpret;
234
235                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
236                 sa = ptlrpc_req_async_args(req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 if (rqset == PTLRPCD_SET)
242                         ptlrpcd_add_req(req);
243                 else
244                         ptlrpc_set_add_req(rqset, req);
245         }
246
247         RETURN(0);
248 }
249
250 static int osc_ladvise_interpret(const struct lu_env *env,
251                                  struct ptlrpc_request *req,
252                                  void *arg, int rc)
253 {
254         struct osc_ladvise_args *la = arg;
255         struct ost_body *body;
256         ENTRY;
257
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262         if (body == NULL)
263                 GOTO(out, rc = -EPROTO);
264
265         *la->la_oa = body->oa;
266 out:
267         rc = la->la_upcall(la->la_cookie, rc);
268         RETURN(rc);
269 }
270
271 /**
272  * If rqset is NULL, do not wait for response. Upcall and cookie could also
273  * be NULL in this case
274  */
275 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
276                      struct ladvise_hdr *ladvise_hdr,
277                      obd_enqueue_update_f upcall, void *cookie,
278                      struct ptlrpc_request_set *rqset)
279 {
280         struct ptlrpc_request   *req;
281         struct ost_body         *body;
282         struct osc_ladvise_args *la;
283         int                      rc;
284         struct lu_ladvise       *req_ladvise;
285         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
286         int                      num_advise = ladvise_hdr->lah_count;
287         struct ladvise_hdr      *req_ladvise_hdr;
288         ENTRY;
289
290         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
291         if (req == NULL)
292                 RETURN(-ENOMEM);
293
294         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
295                              num_advise * sizeof(*ladvise));
296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297         if (rc != 0) {
298                 ptlrpc_request_free(req);
299                 RETURN(rc);
300         }
301         req->rq_request_portal = OST_IO_PORTAL;
302         ptlrpc_at_set_req_timeout(req);
303
304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305         LASSERT(body);
306         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
307                              oa);
308
309         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
310                                                  &RMF_OST_LADVISE_HDR);
311         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312
313         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
314         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
315         ptlrpc_request_set_replen(req);
316
317         if (rqset == NULL) {
318                 /* Do not wait for response. */
319                 ptlrpcd_add_req(req);
320                 RETURN(0);
321         }
322
323         req->rq_interpret_reply = osc_ladvise_interpret;
324         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
325         la = ptlrpc_req_async_args(req);
326         la->la_oa = oa;
327         la->la_upcall = upcall;
328         la->la_cookie = cookie;
329
330         if (rqset == PTLRPCD_SET)
331                 ptlrpcd_add_req(req);
332         else
333                 ptlrpc_set_add_req(rqset, req);
334
335         RETURN(0);
336 }
337
338 static int osc_create(const struct lu_env *env, struct obd_export *exp,
339                       struct obdo *oa)
340 {
341         struct ptlrpc_request *req;
342         struct ost_body       *body;
343         int                    rc;
344         ENTRY;
345
346         LASSERT(oa != NULL);
347         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
348         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349
350         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351         if (req == NULL)
352                 GOTO(out, rc = -ENOMEM);
353
354         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355         if (rc) {
356                 ptlrpc_request_free(req);
357                 GOTO(out, rc);
358         }
359
360         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
361         LASSERT(body);
362
363         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364
365         ptlrpc_request_set_replen(req);
366
367         rc = ptlrpc_queue_wait(req);
368         if (rc)
369                 GOTO(out_req, rc);
370
371         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372         if (body == NULL)
373                 GOTO(out_req, rc = -EPROTO);
374
375         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
376         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377
378         oa->o_blksize = cli_brw_size(exp->exp_obd);
379         oa->o_valid |= OBD_MD_FLBLKSZ;
380
381         CDEBUG(D_HA, "transno: %lld\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
390                    obd_enqueue_update_f upcall, void *cookie)
391 {
392         struct ptlrpc_request *req;
393         struct osc_setattr_args *sa;
394         struct obd_import *imp = class_exp2cliimp(exp);
395         struct ost_body *body;
396         int rc;
397
398         ENTRY;
399
400         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
401         if (req == NULL)
402                 RETURN(-ENOMEM);
403
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc < 0) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409
410         osc_set_io_portal(req);
411
412         ptlrpc_at_set_req_timeout(req);
413
414         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415
416         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = osc_setattr_interpret;
421         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa = oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426
427         ptlrpcd_add_req(req);
428
429         RETURN(0);
430 }
431 EXPORT_SYMBOL(osc_punch_send);
432
433 static int osc_sync_interpret(const struct lu_env *env,
434                               struct ptlrpc_request *req, void *args, int rc)
435 {
436         struct osc_fsync_args *fa = args;
437         struct ost_body *body;
438         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
439         unsigned long valid = 0;
440         struct cl_object *obj;
441         ENTRY;
442
443         if (rc != 0)
444                 GOTO(out, rc);
445
446         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447         if (body == NULL) {
448                 CERROR("can't unpack ost_body\n");
449                 GOTO(out, rc = -EPROTO);
450         }
451
452         *fa->fa_oa = body->oa;
453         obj = osc2cl(fa->fa_obj);
454
455         /* Update osc object's blocks attribute */
456         cl_object_attr_lock(obj);
457         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
458                 attr->cat_blocks = body->oa.o_blocks;
459                 valid |= CAT_BLOCKS;
460         }
461
462         if (valid != 0)
463                 cl_object_attr_update(env, obj, attr, valid);
464         cl_object_attr_unlock(obj);
465
466 out:
467         rc = fa->fa_upcall(fa->fa_cookie, rc);
468         RETURN(rc);
469 }
470
471 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
472                   obd_enqueue_update_f upcall, void *cookie,
473                   struct ptlrpc_request_set *rqset)
474 {
475         struct obd_export     *exp = osc_export(obj);
476         struct ptlrpc_request *req;
477         struct ost_body       *body;
478         struct osc_fsync_args *fa;
479         int                    rc;
480         ENTRY;
481
482         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
483         if (req == NULL)
484                 RETURN(-ENOMEM);
485
486         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487         if (rc) {
488                 ptlrpc_request_free(req);
489                 RETURN(rc);
490         }
491
492         /* overload the size and blocks fields in the oa with start/end */
493         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494         LASSERT(body);
495         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496
497         ptlrpc_request_set_replen(req);
498         req->rq_interpret_reply = osc_sync_interpret;
499
500         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501         fa = ptlrpc_req_async_args(req);
502         fa->fa_obj = obj;
503         fa->fa_oa = oa;
504         fa->fa_upcall = upcall;
505         fa->fa_cookie = cookie;
506
507         if (rqset == PTLRPCD_SET)
508                 ptlrpcd_add_req(req);
509         else
510                 ptlrpc_set_add_req(rqset, req);
511
512         RETURN (0);
513 }
514
515 /* Find and cancel locally locks matched by @mode in the resource found by
516  * @objid. Found locks are added into @cancel list. Returns the amount of
517  * locks added to @cancels list. */
518 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
519                                    struct list_head *cancels,
520                                    enum ldlm_mode mode, __u64 lock_flags)
521 {
522         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
523         struct ldlm_res_id res_id;
524         struct ldlm_resource *res;
525         int count;
526         ENTRY;
527
528         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
529          * export) but disabled through procfs (flag in NS).
530          *
531          * This distinguishes from a case when ELC is not supported originally,
532          * when we still want to cancel locks in advance and just cancel them
533          * locally, without sending any RPC. */
534         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
535                 RETURN(0);
536
537         ostid_build_res_name(&oa->o_oi, &res_id);
538         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
539         if (IS_ERR(res))
540                 RETURN(0);
541
542         LDLM_RESOURCE_ADDREF(res);
543         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
544                                            lock_flags, 0, NULL);
545         LDLM_RESOURCE_DELREF(res);
546         ldlm_resource_putref(res);
547         RETURN(count);
548 }
549
550 static int osc_destroy_interpret(const struct lu_env *env,
551                                  struct ptlrpc_request *req, void *args, int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557
558         return 0;
559 }
560
561 static int osc_can_send_destroy(struct client_obd *cli)
562 {
563         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
564             cli->cl_max_rpcs_in_flight) {
565                 /* The destroy request can be sent */
566                 return 1;
567         }
568         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
569             cli->cl_max_rpcs_in_flight) {
570                 /*
571                  * The counter has been modified between the two atomic
572                  * operations.
573                  */
574                 wake_up(&cli->cl_destroy_waitq);
575         }
576         return 0;
577 }
578
579 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
580                        struct obdo *oa)
581 {
582         struct client_obd     *cli = &exp->exp_obd->u.cli;
583         struct ptlrpc_request *req;
584         struct ost_body       *body;
585         struct list_head       cancels = LIST_HEAD_INIT(cancels);
586         int rc, count;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
595                                         LDLM_FL_DISCARD_DATA);
596
597         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598         if (req == NULL) {
599                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
600                 RETURN(-ENOMEM);
601         }
602
603         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
604                                0, &cancels, count);
605         if (rc) {
606                 ptlrpc_request_free(req);
607                 RETURN(rc);
608         }
609
610         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
611         ptlrpc_at_set_req_timeout(req);
612
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616
617         ptlrpc_request_set_replen(req);
618
619         req->rq_interpret_reply = osc_destroy_interpret;
620         if (!osc_can_send_destroy(cli)) {
621                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
622
623                 /*
624                  * Wait until the number of on-going destroy RPCs drops
625                  * under max_rpc_in_flight
626                  */
627                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
628                                             osc_can_send_destroy(cli), &lwi);
629                 if (rc) {
630                         ptlrpc_req_finished(req);
631                         RETURN(rc);
632                 }
633         }
634
635         /* Do not wait for response */
636         ptlrpcd_add_req(req);
637         RETURN(0);
638 }
639
640 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
641                                 long writing_bytes)
642 {
643         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644
645         LASSERT(!(oa->o_valid & bits));
646
647         oa->o_valid |= bits;
648         spin_lock(&cli->cl_loi_list_lock);
649         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
650                 oa->o_dirty = cli->cl_dirty_grant;
651         else
652                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
653         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654                      cli->cl_dirty_max_pages)) {
655                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656                        cli->cl_dirty_pages, cli->cl_dirty_transit,
657                        cli->cl_dirty_max_pages);
658                 oa->o_undirty = 0;
659         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660                             atomic_long_read(&obd_dirty_transit_pages) >
661                             (long)(obd_max_dirty_pages + 1))) {
662                 /* The atomic_read() allowing the atomic_inc() are
663                  * not covered by a lock thus they may safely race and trip
664                  * this CERROR() unless we add in a small fudge factor (+1). */
665                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
666                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
667                        atomic_long_read(&obd_dirty_transit_pages),
668                        obd_max_dirty_pages);
669                 oa->o_undirty = 0;
670         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671                             0x7fffffff)) {
672                 CERROR("dirty %lu - dirty_max %lu too big???\n",
673                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
674                 oa->o_undirty = 0;
675         } else {
676                 unsigned long nrpages;
677                 unsigned long undirty;
678
679                 nrpages = cli->cl_max_pages_per_rpc;
680                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
681                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
682                 undirty = nrpages << PAGE_SHIFT;
683                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
684                                  GRANT_PARAM)) {
685                         int nrextents;
686
687                         /* take extent tax into account when asking for more
688                          * grant space */
689                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
690                                      cli->cl_max_extent_pages;
691                         undirty += nrextents * cli->cl_grant_extent_tax;
692                 }
693                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
694                  * to add extent tax, etc.
695                  */
696                 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
697                                     (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
698         }
699         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
700         oa->o_dropped = cli->cl_lost_grant;
701         cli->cl_lost_grant = 0;
702         spin_unlock(&cli->cl_loi_list_lock);
703         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
704                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
705 }
706
707 void osc_update_next_shrink(struct client_obd *cli)
708 {
709         cli->cl_next_shrink_grant = ktime_get_seconds() +
710                                     cli->cl_grant_shrink_interval;
711
712         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
713                cli->cl_next_shrink_grant);
714 }
715
716 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 {
718         spin_lock(&cli->cl_loi_list_lock);
719         cli->cl_avail_grant += grant;
720         spin_unlock(&cli->cl_loi_list_lock);
721 }
722
723 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 {
725         if (body->oa.o_valid & OBD_MD_FLGRANT) {
726                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
727                 __osc_update_grant(cli, body->oa.o_grant);
728         }
729 }
730
731 /**
732  * grant thread data for shrinking space.
733  */
734 struct grant_thread_data {
735         struct list_head        gtd_clients;
736         struct mutex            gtd_mutex;
737         unsigned long           gtd_stopped:1;
738 };
739 static struct grant_thread_data client_gtd;
740
741 static int osc_shrink_grant_interpret(const struct lu_env *env,
742                                       struct ptlrpc_request *req,
743                                       void *args, int rc)
744 {
745         struct osc_grant_args *aa = args;
746         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
747         struct ost_body *body;
748
749         if (rc != 0) {
750                 __osc_update_grant(cli, aa->aa_oa->o_grant);
751                 GOTO(out, rc);
752         }
753
754         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755         LASSERT(body);
756         osc_update_grant(cli, body);
757 out:
758         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
759
760         return rc;
761 }
762
763 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 {
765         spin_lock(&cli->cl_loi_list_lock);
766         oa->o_grant = cli->cl_avail_grant / 4;
767         cli->cl_avail_grant -= oa->o_grant;
768         spin_unlock(&cli->cl_loi_list_lock);
769         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
770                 oa->o_valid |= OBD_MD_FLFLAGS;
771                 oa->o_flags = 0;
772         }
773         oa->o_flags |= OBD_FL_SHRINK_GRANT;
774         osc_update_next_shrink(cli);
775 }
776
777 /* Shrink the current grant, either from some large amount to enough for a
778  * full set of in-flight RPCs, or if we have already shrunk to that limit
779  * then to enough for a single RPC.  This avoids keeping more grant than
780  * needed, and avoids shrinking the grant piecemeal. */
781 static int osc_shrink_grant(struct client_obd *cli)
782 {
783         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
784                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785
786         spin_lock(&cli->cl_loi_list_lock);
787         if (cli->cl_avail_grant <= target_bytes)
788                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
789         spin_unlock(&cli->cl_loi_list_lock);
790
791         return osc_shrink_grant_to_target(cli, target_bytes);
792 }
793
794 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
795 {
796         int                     rc = 0;
797         struct ost_body        *body;
798         ENTRY;
799
800         spin_lock(&cli->cl_loi_list_lock);
801         /* Don't shrink if we are already above or below the desired limit
802          * We don't want to shrink below a single RPC, as that will negatively
803          * impact block allocation and long-term performance. */
804         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
805                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806
807         if (target_bytes >= cli->cl_avail_grant) {
808                 spin_unlock(&cli->cl_loi_list_lock);
809                 RETURN(0);
810         }
811         spin_unlock(&cli->cl_loi_list_lock);
812
813         OBD_ALLOC_PTR(body);
814         if (!body)
815                 RETURN(-ENOMEM);
816
817         osc_announce_cached(cli, &body->oa, 0);
818
819         spin_lock(&cli->cl_loi_list_lock);
820         if (target_bytes >= cli->cl_avail_grant) {
821                 /* available grant has changed since target calculation */
822                 spin_unlock(&cli->cl_loi_list_lock);
823                 GOTO(out_free, rc = 0);
824         }
825         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
826         cli->cl_avail_grant = target_bytes;
827         spin_unlock(&cli->cl_loi_list_lock);
828         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
829                 body->oa.o_valid |= OBD_MD_FLFLAGS;
830                 body->oa.o_flags = 0;
831         }
832         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
833         osc_update_next_shrink(cli);
834
835         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
836                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
837                                 sizeof(*body), body, NULL);
838         if (rc != 0)
839                 __osc_update_grant(cli, body->oa.o_grant);
840 out_free:
841         OBD_FREE_PTR(body);
842         RETURN(rc);
843 }
844
845 static int osc_should_shrink_grant(struct client_obd *client)
846 {
847         time64_t next_shrink = client->cl_next_shrink_grant;
848
849         if (client->cl_import == NULL)
850                 return 0;
851
852         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
853              OBD_CONNECT_GRANT_SHRINK) == 0)
854                 return 0;
855
856         if (ktime_get_seconds() >= next_shrink - 5) {
857                 /* Get the current RPC size directly, instead of going via:
858                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859                  * Keep comment here so that it can be found by searching. */
860                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861
862                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863                     client->cl_avail_grant > brw_size)
864                         return 1;
865                 else
866                         osc_update_next_shrink(client);
867         }
868         return 0;
869 }
870
871 #define GRANT_SHRINK_RPC_BATCH  100
872
873 static struct delayed_work work;
874
875 static void osc_grant_work_handler(struct work_struct *data)
876 {
877         struct client_obd *cli;
878         int rpc_sent;
879         bool init_next_shrink = true;
880         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
881
882         rpc_sent = 0;
883         mutex_lock(&client_gtd.gtd_mutex);
884         list_for_each_entry(cli, &client_gtd.gtd_clients,
885                             cl_grant_chain) {
886                 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
887                     osc_should_shrink_grant(cli))
888                         osc_shrink_grant(cli);
889
890                 if (!init_next_shrink) {
891                         if (cli->cl_next_shrink_grant < next_shrink &&
892                             cli->cl_next_shrink_grant > ktime_get_seconds())
893                                 next_shrink = cli->cl_next_shrink_grant;
894                 } else {
895                         init_next_shrink = false;
896                         next_shrink = cli->cl_next_shrink_grant;
897                 }
898         }
899         mutex_unlock(&client_gtd.gtd_mutex);
900
901         if (client_gtd.gtd_stopped == 1)
902                 return;
903
904         if (next_shrink > ktime_get_seconds())
905                 schedule_delayed_work(&work, msecs_to_jiffies(
906                                         (next_shrink - ktime_get_seconds()) *
907                                         MSEC_PER_SEC));
908         else
909                 schedule_work(&work.work);
910 }
911
912 /**
913  * Start grant thread for returing grant to server for idle clients.
914  */
915 static int osc_start_grant_work(void)
916 {
917         client_gtd.gtd_stopped = 0;
918         mutex_init(&client_gtd.gtd_mutex);
919         INIT_LIST_HEAD(&client_gtd.gtd_clients);
920
921         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
922         schedule_work(&work.work);
923
924         return 0;
925 }
926
927 static void osc_stop_grant_work(void)
928 {
929         client_gtd.gtd_stopped = 1;
930         cancel_delayed_work_sync(&work);
931 }
932
933 static void osc_add_grant_list(struct client_obd *client)
934 {
935         mutex_lock(&client_gtd.gtd_mutex);
936         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
937         mutex_unlock(&client_gtd.gtd_mutex);
938 }
939
940 static void osc_del_grant_list(struct client_obd *client)
941 {
942         if (list_empty(&client->cl_grant_chain))
943                 return;
944
945         mutex_lock(&client_gtd.gtd_mutex);
946         list_del_init(&client->cl_grant_chain);
947         mutex_unlock(&client_gtd.gtd_mutex);
948 }
949
950 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
951 {
952         /*
953          * ocd_grant is the total grant amount we're expect to hold: if we've
954          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
955          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
956          * dirty.
957          *
958          * race is tolerable here: if we're evicted, but imp_state already
959          * left EVICTED state, then cl_dirty_pages must be 0 already.
960          */
961         spin_lock(&cli->cl_loi_list_lock);
962         cli->cl_avail_grant = ocd->ocd_grant;
963         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
964                 cli->cl_avail_grant -= cli->cl_reserved_grant;
965                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
966                         cli->cl_avail_grant -= cli->cl_dirty_grant;
967                 else
968                         cli->cl_avail_grant -=
969                                         cli->cl_dirty_pages << PAGE_SHIFT;
970         }
971
972         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
973                 u64 size;
974                 int chunk_mask;
975
976                 /* overhead for each extent insertion */
977                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
978                 /* determine the appropriate chunk size used by osc_extent. */
979                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
980                                           ocd->ocd_grant_blkbits);
981                 /* max_pages_per_rpc must be chunk aligned */
982                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
983                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
984                                              ~chunk_mask) & chunk_mask;
985                 /* determine maximum extent size, in #pages */
986                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
987                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
988                 if (cli->cl_max_extent_pages == 0)
989                         cli->cl_max_extent_pages = 1;
990         } else {
991                 cli->cl_grant_extent_tax = 0;
992                 cli->cl_chunkbits = PAGE_SHIFT;
993                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
994         }
995         spin_unlock(&cli->cl_loi_list_lock);
996
997         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
998                 "chunk bits: %d cl_max_extent_pages: %d\n",
999                 cli_name(cli),
1000                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1001                 cli->cl_max_extent_pages);
1002
1003         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1004                 osc_add_grant_list(cli);
1005 }
1006 EXPORT_SYMBOL(osc_init_grant);
1007
1008 /* We assume that the reason this OSC got a short read is because it read
1009  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1010  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1011  * this stripe never got written at or beyond this stripe offset yet. */
1012 static void handle_short_read(int nob_read, size_t page_count,
1013                               struct brw_page **pga)
1014 {
1015         char *ptr;
1016         int i = 0;
1017
1018         /* skip bytes read OK */
1019         while (nob_read > 0) {
1020                 LASSERT (page_count > 0);
1021
1022                 if (pga[i]->count > nob_read) {
1023                         /* EOF inside this page */
1024                         ptr = kmap(pga[i]->pg) +
1025                                 (pga[i]->off & ~PAGE_MASK);
1026                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1027                         kunmap(pga[i]->pg);
1028                         page_count--;
1029                         i++;
1030                         break;
1031                 }
1032
1033                 nob_read -= pga[i]->count;
1034                 page_count--;
1035                 i++;
1036         }
1037
1038         /* zero remaining pages */
1039         while (page_count-- > 0) {
1040                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1041                 memset(ptr, 0, pga[i]->count);
1042                 kunmap(pga[i]->pg);
1043                 i++;
1044         }
1045 }
1046
1047 static int check_write_rcs(struct ptlrpc_request *req,
1048                            int requested_nob, int niocount,
1049                            size_t page_count, struct brw_page **pga)
1050 {
1051         int     i;
1052         __u32   *remote_rcs;
1053
1054         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1055                                                   sizeof(*remote_rcs) *
1056                                                   niocount);
1057         if (remote_rcs == NULL) {
1058                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1059                 return(-EPROTO);
1060         }
1061
1062         /* return error if any niobuf was in error */
1063         for (i = 0; i < niocount; i++) {
1064                 if ((int)remote_rcs[i] < 0)
1065                         return(remote_rcs[i]);
1066
1067                 if (remote_rcs[i] != 0) {
1068                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1069                                 i, remote_rcs[i], req);
1070                         return(-EPROTO);
1071                 }
1072         }
1073         if (req->rq_bulk != NULL &&
1074             req->rq_bulk->bd_nob_transferred != requested_nob) {
1075                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1076                        req->rq_bulk->bd_nob_transferred, requested_nob);
1077                 return(-EPROTO);
1078         }
1079
1080         return (0);
1081 }
1082
1083 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1084 {
1085         if (p1->flag != p2->flag) {
1086                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1087                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1088                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1089
1090                 /* warn if we try to combine flags that we don't know to be
1091                  * safe to combine */
1092                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1093                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1094                               "report this at https://jira.whamcloud.com/\n",
1095                               p1->flag, p2->flag);
1096                 }
1097                 return 0;
1098         }
1099
1100         return (p1->off + p1->count == p2->off);
1101 }
1102
1103 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1104 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1105                                    size_t pg_count, struct brw_page **pga,
1106                                    int opc, obd_dif_csum_fn *fn,
1107                                    int sector_size,
1108                                    u32 *check_sum)
1109 {
1110         struct ahash_request *req;
1111         /* Used Adler as the default checksum type on top of DIF tags */
1112         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1113         struct page *__page;
1114         unsigned char *buffer;
1115         __u16 *guard_start;
1116         unsigned int bufsize;
1117         int guard_number;
1118         int used_number = 0;
1119         int used;
1120         u32 cksum;
1121         int rc = 0;
1122         int i = 0;
1123
1124         LASSERT(pg_count > 0);
1125
1126         __page = alloc_page(GFP_KERNEL);
1127         if (__page == NULL)
1128                 return -ENOMEM;
1129
1130         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1131         if (IS_ERR(req)) {
1132                 rc = PTR_ERR(req);
1133                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1134                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1135                 GOTO(out, rc);
1136         }
1137
1138         buffer = kmap(__page);
1139         guard_start = (__u16 *)buffer;
1140         guard_number = PAGE_SIZE / sizeof(*guard_start);
1141         while (nob > 0 && pg_count > 0) {
1142                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1143
1144                 /* corrupt the data before we compute the checksum, to
1145                  * simulate an OST->client data error */
1146                 if (unlikely(i == 0 && opc == OST_READ &&
1147                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1148                         unsigned char *ptr = kmap(pga[i]->pg);
1149                         int off = pga[i]->off & ~PAGE_MASK;
1150
1151                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1152                         kunmap(pga[i]->pg);
1153                 }
1154
1155                 /*
1156                  * The left guard number should be able to hold checksums of a
1157                  * whole page
1158                  */
1159                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1160                                                   pga[i]->off & ~PAGE_MASK,
1161                                                   count,
1162                                                   guard_start + used_number,
1163                                                   guard_number - used_number,
1164                                                   &used, sector_size,
1165                                                   fn);
1166                 if (rc)
1167                         break;
1168
1169                 used_number += used;
1170                 if (used_number == guard_number) {
1171                         cfs_crypto_hash_update_page(req, __page, 0,
1172                                 used_number * sizeof(*guard_start));
1173                         used_number = 0;
1174                 }
1175
1176                 nob -= pga[i]->count;
1177                 pg_count--;
1178                 i++;
1179         }
1180         kunmap(__page);
1181         if (rc)
1182                 GOTO(out, rc);
1183
1184         if (used_number != 0)
1185                 cfs_crypto_hash_update_page(req, __page, 0,
1186                         used_number * sizeof(*guard_start));
1187
1188         bufsize = sizeof(cksum);
1189         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1190
1191         /* For sending we only compute the wrong checksum instead
1192          * of corrupting the data so it is still correct on a redo */
1193         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1194                 cksum++;
1195
1196         *check_sum = cksum;
1197 out:
1198         __free_page(__page);
1199         return rc;
1200 }
1201 #else /* !CONFIG_CRC_T10DIF */
1202 #define obd_dif_ip_fn NULL
1203 #define obd_dif_crc_fn NULL
1204 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1205         -EOPNOTSUPP
1206 #endif /* CONFIG_CRC_T10DIF */
1207
1208 static int osc_checksum_bulk(int nob, size_t pg_count,
1209                              struct brw_page **pga, int opc,
1210                              enum cksum_types cksum_type,
1211                              u32 *cksum)
1212 {
1213         int                             i = 0;
1214         struct ahash_request           *req;
1215         unsigned int                    bufsize;
1216         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1217
1218         LASSERT(pg_count > 0);
1219
1220         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1221         if (IS_ERR(req)) {
1222                 CERROR("Unable to initialize checksum hash %s\n",
1223                        cfs_crypto_hash_name(cfs_alg));
1224                 return PTR_ERR(req);
1225         }
1226
1227         while (nob > 0 && pg_count > 0) {
1228                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1229
1230                 /* corrupt the data before we compute the checksum, to
1231                  * simulate an OST->client data error */
1232                 if (i == 0 && opc == OST_READ &&
1233                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1234                         unsigned char *ptr = kmap(pga[i]->pg);
1235                         int off = pga[i]->off & ~PAGE_MASK;
1236
1237                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1238                         kunmap(pga[i]->pg);
1239                 }
1240                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1241                                             pga[i]->off & ~PAGE_MASK,
1242                                             count);
1243                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1244                                (int)(pga[i]->off & ~PAGE_MASK));
1245
1246                 nob -= pga[i]->count;
1247                 pg_count--;
1248                 i++;
1249         }
1250
1251         bufsize = sizeof(*cksum);
1252         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1253
1254         /* For sending we only compute the wrong checksum instead
1255          * of corrupting the data so it is still correct on a redo */
1256         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1257                 (*cksum)++;
1258
1259         return 0;
1260 }
1261
1262 static int osc_checksum_bulk_rw(const char *obd_name,
1263                                 enum cksum_types cksum_type,
1264                                 int nob, size_t pg_count,
1265                                 struct brw_page **pga, int opc,
1266                                 u32 *check_sum)
1267 {
1268         obd_dif_csum_fn *fn = NULL;
1269         int sector_size = 0;
1270         int rc;
1271
1272         ENTRY;
1273         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1274
1275         if (fn)
1276                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1277                                              opc, fn, sector_size, check_sum);
1278         else
1279                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1280                                        check_sum);
1281
1282         RETURN(rc);
1283 }
1284
1285 static int
1286 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1287                      u32 page_count, struct brw_page **pga,
1288                      struct ptlrpc_request **reqp, int resend)
1289 {
1290         struct ptlrpc_request   *req;
1291         struct ptlrpc_bulk_desc *desc;
1292         struct ost_body         *body;
1293         struct obd_ioobj        *ioobj;
1294         struct niobuf_remote    *niobuf;
1295         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1296         struct osc_brw_async_args *aa;
1297         struct req_capsule      *pill;
1298         struct brw_page *pg_prev;
1299         void *short_io_buf;
1300         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1301
1302         ENTRY;
1303         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1304                 RETURN(-ENOMEM); /* Recoverable */
1305         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1306                 RETURN(-EINVAL); /* Fatal */
1307
1308         if ((cmd & OBD_BRW_WRITE) != 0) {
1309                 opc = OST_WRITE;
1310                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1311                                                 osc_rq_pool,
1312                                                 &RQF_OST_BRW_WRITE);
1313         } else {
1314                 opc = OST_READ;
1315                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1316         }
1317         if (req == NULL)
1318                 RETURN(-ENOMEM);
1319
1320         for (niocount = i = 1; i < page_count; i++) {
1321                 if (!can_merge_pages(pga[i - 1], pga[i]))
1322                         niocount++;
1323         }
1324
1325         pill = &req->rq_pill;
1326         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1327                              sizeof(*ioobj));
1328         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1329                              niocount * sizeof(*niobuf));
1330
1331         for (i = 0; i < page_count; i++)
1332                 short_io_size += pga[i]->count;
1333
1334         /* Check if read/write is small enough to be a short io. */
1335         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1336             !imp_connect_shortio(cli->cl_import))
1337                 short_io_size = 0;
1338
1339         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1340                              opc == OST_READ ? 0 : short_io_size);
1341         if (opc == OST_READ)
1342                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1343                                      short_io_size);
1344
1345         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1346         if (rc) {
1347                 ptlrpc_request_free(req);
1348                 RETURN(rc);
1349         }
1350         osc_set_io_portal(req);
1351
1352         ptlrpc_at_set_req_timeout(req);
1353         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1354          * retry logic */
1355         req->rq_no_retry_einprogress = 1;
1356
1357         if (short_io_size != 0) {
1358                 desc = NULL;
1359                 short_io_buf = NULL;
1360                 goto no_bulk;
1361         }
1362
1363         desc = ptlrpc_prep_bulk_imp(req, page_count,
1364                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1365                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1366                         PTLRPC_BULK_PUT_SINK) |
1367                         PTLRPC_BULK_BUF_KIOV,
1368                 OST_BULK_PORTAL,
1369                 &ptlrpc_bulk_kiov_pin_ops);
1370
1371         if (desc == NULL)
1372                 GOTO(out, rc = -ENOMEM);
1373         /* NB request now owns desc and will free it when it gets freed */
1374 no_bulk:
1375         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1376         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1377         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1378         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1379
1380         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1381
1382         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1383          * and from_kgid(), because they are asynchronous. Fortunately, variable
1384          * oa contains valid o_uid and o_gid in these two operations.
1385          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1386          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1387          * other process logic */
1388         body->oa.o_uid = oa->o_uid;
1389         body->oa.o_gid = oa->o_gid;
1390
1391         obdo_to_ioobj(oa, ioobj);
1392         ioobj->ioo_bufcnt = niocount;
1393         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1394          * that might be send for this request.  The actual number is decided
1395          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1396          * "max - 1" for old client compatibility sending "0", and also so the
1397          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1398         if (desc != NULL)
1399                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1400         else /* short io */
1401                 ioobj_max_brw_set(ioobj, 0);
1402
1403         if (short_io_size != 0) {
1404                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1406                         body->oa.o_flags = 0;
1407                 }
1408                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1409                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1410                        short_io_size);
1411                 if (opc == OST_WRITE) {
1412                         short_io_buf = req_capsule_client_get(pill,
1413                                                               &RMF_SHORT_IO);
1414                         LASSERT(short_io_buf != NULL);
1415                 }
1416         }
1417
1418         LASSERT(page_count > 0);
1419         pg_prev = pga[0];
1420         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1421                 struct brw_page *pg = pga[i];
1422                 int poff = pg->off & ~PAGE_MASK;
1423
1424                 LASSERT(pg->count > 0);
1425                 /* make sure there is no gap in the middle of page array */
1426                 LASSERTF(page_count == 1 ||
1427                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1428                           ergo(i > 0 && i < page_count - 1,
1429                                poff == 0 && pg->count == PAGE_SIZE)   &&
1430                           ergo(i == page_count - 1, poff == 0)),
1431                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1432                          i, page_count, pg, pg->off, pg->count);
1433                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1434                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1435                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1436                          i, page_count,
1437                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1438                          pg_prev->pg, page_private(pg_prev->pg),
1439                          pg_prev->pg->index, pg_prev->off);
1440                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1441                         (pg->flag & OBD_BRW_SRVLOCK));
1442                 if (short_io_size != 0 && opc == OST_WRITE) {
1443                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1444
1445                         LASSERT(short_io_size >= requested_nob + pg->count);
1446                         memcpy(short_io_buf + requested_nob,
1447                                ptr + poff,
1448                                pg->count);
1449                         ll_kunmap_atomic(ptr, KM_USER0);
1450                 } else if (short_io_size == 0) {
1451                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1452                                                          pg->count);
1453                 }
1454                 requested_nob += pg->count;
1455
1456                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1457                         niobuf--;
1458                         niobuf->rnb_len += pg->count;
1459                 } else {
1460                         niobuf->rnb_offset = pg->off;
1461                         niobuf->rnb_len    = pg->count;
1462                         niobuf->rnb_flags  = pg->flag;
1463                 }
1464                 pg_prev = pg;
1465         }
1466
1467         LASSERTF((void *)(niobuf - niocount) ==
1468                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1469                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1470                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1471
1472         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1473         if (resend) {
1474                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1475                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1476                         body->oa.o_flags = 0;
1477                 }
1478                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1479         }
1480
1481         if (osc_should_shrink_grant(cli))
1482                 osc_shrink_grant_local(cli, &body->oa);
1483
1484         /* size[REQ_REC_OFF] still sizeof (*body) */
1485         if (opc == OST_WRITE) {
1486                 if (cli->cl_checksum &&
1487                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1488                         /* store cl_cksum_type in a local variable since
1489                          * it can be changed via lprocfs */
1490                         enum cksum_types cksum_type = cli->cl_cksum_type;
1491
1492                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1493                                 body->oa.o_flags = 0;
1494
1495                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1496                                                                 cksum_type);
1497                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1498
1499                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1500                                                   requested_nob, page_count,
1501                                                   pga, OST_WRITE,
1502                                                   &body->oa.o_cksum);
1503                         if (rc < 0) {
1504                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1505                                        rc);
1506                                 GOTO(out, rc);
1507                         }
1508                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1509                                body->oa.o_cksum);
1510
1511                         /* save this in 'oa', too, for later checking */
1512                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1513                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1514                                                            cksum_type);
1515                 } else {
1516                         /* clear out the checksum flag, in case this is a
1517                          * resend but cl_checksum is no longer set. b=11238 */
1518                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1519                 }
1520                 oa->o_cksum = body->oa.o_cksum;
1521                 /* 1 RC per niobuf */
1522                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1523                                      sizeof(__u32) * niocount);
1524         } else {
1525                 if (cli->cl_checksum &&
1526                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1527                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1528                                 body->oa.o_flags = 0;
1529                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1530                                 cli->cl_cksum_type);
1531                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1532                 }
1533
1534                 /* Client cksum has been already copied to wire obdo in previous
1535                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1536                  * resent due to cksum error, this will allow Server to
1537                  * check+dump pages on its side */
1538         }
1539         ptlrpc_request_set_replen(req);
1540
1541         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1542         aa = ptlrpc_req_async_args(req);
1543         aa->aa_oa = oa;
1544         aa->aa_requested_nob = requested_nob;
1545         aa->aa_nio_count = niocount;
1546         aa->aa_page_count = page_count;
1547         aa->aa_resends = 0;
1548         aa->aa_ppga = pga;
1549         aa->aa_cli = cli;
1550         INIT_LIST_HEAD(&aa->aa_oaps);
1551
1552         *reqp = req;
1553         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1554         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1555                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1556                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1557         RETURN(0);
1558
1559  out:
1560         ptlrpc_req_finished(req);
1561         RETURN(rc);
1562 }
1563
1564 char dbgcksum_file_name[PATH_MAX];
1565
1566 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1567                                 struct brw_page **pga, __u32 server_cksum,
1568                                 __u32 client_cksum)
1569 {
1570         struct file *filp;
1571         int rc, i;
1572         unsigned int len;
1573         char *buf;
1574
1575         /* will only keep dump of pages on first error for the same range in
1576          * file/fid, not during the resends/retries. */
1577         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1578                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1579                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1580                   libcfs_debug_file_path_arr :
1581                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1582                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1583                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1584                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1585                  pga[0]->off,
1586                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1587                  client_cksum, server_cksum);
1588         filp = filp_open(dbgcksum_file_name,
1589                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1590         if (IS_ERR(filp)) {
1591                 rc = PTR_ERR(filp);
1592                 if (rc == -EEXIST)
1593                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1594                                "checksum error: rc = %d\n", dbgcksum_file_name,
1595                                rc);
1596                 else
1597                         CERROR("%s: can't open to dump pages with checksum "
1598                                "error: rc = %d\n", dbgcksum_file_name, rc);
1599                 return;
1600         }
1601
1602         for (i = 0; i < page_count; i++) {
1603                 len = pga[i]->count;
1604                 buf = kmap(pga[i]->pg);
1605                 while (len != 0) {
1606                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1607                         if (rc < 0) {
1608                                 CERROR("%s: wanted to write %u but got %d "
1609                                        "error\n", dbgcksum_file_name, len, rc);
1610                                 break;
1611                         }
1612                         len -= rc;
1613                         buf += rc;
1614                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1615                                dbgcksum_file_name, rc);
1616                 }
1617                 kunmap(pga[i]->pg);
1618         }
1619
1620         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1621         if (rc)
1622                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1623         filp_close(filp, NULL);
1624         return;
1625 }
1626
1627 static int
1628 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1629                      __u32 client_cksum, __u32 server_cksum,
1630                      struct osc_brw_async_args *aa)
1631 {
1632         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1633         enum cksum_types cksum_type;
1634         obd_dif_csum_fn *fn = NULL;
1635         int sector_size = 0;
1636         __u32 new_cksum;
1637         char *msg;
1638         int rc;
1639
1640         if (server_cksum == client_cksum) {
1641                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1642                 return 0;
1643         }
1644
1645         if (aa->aa_cli->cl_checksum_dump)
1646                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1647                                     server_cksum, client_cksum);
1648
1649         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1650                                            oa->o_flags : 0);
1651
1652         switch (cksum_type) {
1653         case OBD_CKSUM_T10IP512:
1654                 fn = obd_dif_ip_fn;
1655                 sector_size = 512;
1656                 break;
1657         case OBD_CKSUM_T10IP4K:
1658                 fn = obd_dif_ip_fn;
1659                 sector_size = 4096;
1660                 break;
1661         case OBD_CKSUM_T10CRC512:
1662                 fn = obd_dif_crc_fn;
1663                 sector_size = 512;
1664                 break;
1665         case OBD_CKSUM_T10CRC4K:
1666                 fn = obd_dif_crc_fn;
1667                 sector_size = 4096;
1668                 break;
1669         default:
1670                 break;
1671         }
1672
1673         if (fn)
1674                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1675                                              aa->aa_page_count, aa->aa_ppga,
1676                                              OST_WRITE, fn, sector_size,
1677                                              &new_cksum);
1678         else
1679                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1680                                        aa->aa_ppga, OST_WRITE, cksum_type,
1681                                        &new_cksum);
1682
1683         if (rc < 0)
1684                 msg = "failed to calculate the client write checksum";
1685         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1686                 msg = "the server did not use the checksum type specified in "
1687                       "the original request - likely a protocol problem";
1688         else if (new_cksum == server_cksum)
1689                 msg = "changed on the client after we checksummed it - "
1690                       "likely false positive due to mmap IO (bug 11742)";
1691         else if (new_cksum == client_cksum)
1692                 msg = "changed in transit before arrival at OST";
1693         else
1694                 msg = "changed in transit AND doesn't match the original - "
1695                       "likely false positive due to mmap IO (bug 11742)";
1696
1697         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1698                            DFID " object "DOSTID" extent [%llu-%llu], original "
1699                            "client csum %x (type %x), server csum %x (type %x),"
1700                            " client csum now %x\n",
1701                            obd_name, msg, libcfs_nid2str(peer->nid),
1702                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1703                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1704                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1705                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1706                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1707                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1708                            client_cksum,
1709                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1710                            server_cksum, cksum_type, new_cksum);
1711         return 1;
1712 }
1713
1714 /* Note rc enters this function as number of bytes transferred */
1715 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1716 {
1717         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1718         struct client_obd *cli = aa->aa_cli;
1719         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1720         const struct lnet_process_id *peer =
1721                 &req->rq_import->imp_connection->c_peer;
1722         struct ost_body *body;
1723         u32 client_cksum = 0;
1724         ENTRY;
1725
1726         if (rc < 0 && rc != -EDQUOT) {
1727                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1728                 RETURN(rc);
1729         }
1730
1731         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1732         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1733         if (body == NULL) {
1734                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1735                 RETURN(-EPROTO);
1736         }
1737
1738         /* set/clear over quota flag for a uid/gid/projid */
1739         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1740             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1741                 unsigned qid[LL_MAXQUOTAS] = {
1742                                          body->oa.o_uid, body->oa.o_gid,
1743                                          body->oa.o_projid };
1744                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1745                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1746                        body->oa.o_valid, body->oa.o_flags);
1747                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1748                                        body->oa.o_flags);
1749         }
1750
1751         osc_update_grant(cli, body);
1752
1753         if (rc < 0)
1754                 RETURN(rc);
1755
1756         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1757                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1758
1759         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1760                 if (rc > 0) {
1761                         CERROR("Unexpected +ve rc %d\n", rc);
1762                         RETURN(-EPROTO);
1763                 }
1764
1765                 if (req->rq_bulk != NULL &&
1766                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1767                         RETURN(-EAGAIN);
1768
1769                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1770                     check_write_checksum(&body->oa, peer, client_cksum,
1771                                          body->oa.o_cksum, aa))
1772                         RETURN(-EAGAIN);
1773
1774                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1775                                      aa->aa_page_count, aa->aa_ppga);
1776                 GOTO(out, rc);
1777         }
1778
1779         /* The rest of this function executes only for OST_READs */
1780
1781         if (req->rq_bulk == NULL) {
1782                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1783                                           RCL_SERVER);
1784                 LASSERT(rc == req->rq_status);
1785         } else {
1786                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1787                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1788         }
1789         if (rc < 0)
1790                 GOTO(out, rc = -EAGAIN);
1791
1792         if (rc > aa->aa_requested_nob) {
1793                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1794                        aa->aa_requested_nob);
1795                 RETURN(-EPROTO);
1796         }
1797
1798         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1799                 CERROR ("Unexpected rc %d (%d transferred)\n",
1800                         rc, req->rq_bulk->bd_nob_transferred);
1801                 return (-EPROTO);
1802         }
1803
1804         if (req->rq_bulk == NULL) {
1805                 /* short io */
1806                 int nob, pg_count, i = 0;
1807                 unsigned char *buf;
1808
1809                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1810                 pg_count = aa->aa_page_count;
1811                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1812                                                    rc);
1813                 nob = rc;
1814                 while (nob > 0 && pg_count > 0) {
1815                         unsigned char *ptr;
1816                         int count = aa->aa_ppga[i]->count > nob ?
1817                                     nob : aa->aa_ppga[i]->count;
1818
1819                         CDEBUG(D_CACHE, "page %p count %d\n",
1820                                aa->aa_ppga[i]->pg, count);
1821                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1822                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1823                                count);
1824                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1825
1826                         buf += count;
1827                         nob -= count;
1828                         i++;
1829                         pg_count--;
1830                 }
1831         }
1832
1833         if (rc < aa->aa_requested_nob)
1834                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1835
1836         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1837                 static int cksum_counter;
1838                 u32        server_cksum = body->oa.o_cksum;
1839                 char      *via = "";
1840                 char      *router = "";
1841                 enum cksum_types cksum_type;
1842                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1843                         body->oa.o_flags : 0;
1844
1845                 cksum_type = obd_cksum_type_unpack(o_flags);
1846                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1847                                           aa->aa_page_count, aa->aa_ppga,
1848                                           OST_READ, &client_cksum);
1849                 if (rc < 0)
1850                         GOTO(out, rc);
1851
1852                 if (req->rq_bulk != NULL &&
1853                     peer->nid != req->rq_bulk->bd_sender) {
1854                         via = " via ";
1855                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1856                 }
1857
1858                 if (server_cksum != client_cksum) {
1859                         struct ost_body *clbody;
1860                         u32 page_count = aa->aa_page_count;
1861
1862                         clbody = req_capsule_client_get(&req->rq_pill,
1863                                                         &RMF_OST_BODY);
1864                         if (cli->cl_checksum_dump)
1865                                 dump_all_bulk_pages(&clbody->oa, page_count,
1866                                                     aa->aa_ppga, server_cksum,
1867                                                     client_cksum);
1868
1869                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1870                                            "%s%s%s inode "DFID" object "DOSTID
1871                                            " extent [%llu-%llu], client %x, "
1872                                            "server %x, cksum_type %x\n",
1873                                            obd_name,
1874                                            libcfs_nid2str(peer->nid),
1875                                            via, router,
1876                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1877                                                 clbody->oa.o_parent_seq : 0ULL,
1878                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1879                                                 clbody->oa.o_parent_oid : 0,
1880                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1881                                                 clbody->oa.o_parent_ver : 0,
1882                                            POSTID(&body->oa.o_oi),
1883                                            aa->aa_ppga[0]->off,
1884                                            aa->aa_ppga[page_count-1]->off +
1885                                            aa->aa_ppga[page_count-1]->count - 1,
1886                                            client_cksum, server_cksum,
1887                                            cksum_type);
1888                         cksum_counter = 0;
1889                         aa->aa_oa->o_cksum = client_cksum;
1890                         rc = -EAGAIN;
1891                 } else {
1892                         cksum_counter++;
1893                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1894                         rc = 0;
1895                 }
1896         } else if (unlikely(client_cksum)) {
1897                 static int cksum_missed;
1898
1899                 cksum_missed++;
1900                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1901                         CERROR("Checksum %u requested from %s but not sent\n",
1902                                cksum_missed, libcfs_nid2str(peer->nid));
1903         } else {
1904                 rc = 0;
1905         }
1906 out:
1907         if (rc >= 0)
1908                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1909                                      aa->aa_oa, &body->oa);
1910
1911         RETURN(rc);
1912 }
1913
1914 static int osc_brw_redo_request(struct ptlrpc_request *request,
1915                                 struct osc_brw_async_args *aa, int rc)
1916 {
1917         struct ptlrpc_request *new_req;
1918         struct osc_brw_async_args *new_aa;
1919         struct osc_async_page *oap;
1920         ENTRY;
1921
1922         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1923                   "redo for recoverable error %d", rc);
1924
1925         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1926                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1927                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1928                                   aa->aa_ppga, &new_req, 1);
1929         if (rc)
1930                 RETURN(rc);
1931
1932         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1933                 if (oap->oap_request != NULL) {
1934                         LASSERTF(request == oap->oap_request,
1935                                  "request %p != oap_request %p\n",
1936                                  request, oap->oap_request);
1937                         if (oap->oap_interrupted) {
1938                                 ptlrpc_req_finished(new_req);
1939                                 RETURN(-EINTR);
1940                         }
1941                 }
1942         }
1943         /*
1944          * New request takes over pga and oaps from old request.
1945          * Note that copying a list_head doesn't work, need to move it...
1946          */
1947         aa->aa_resends++;
1948         new_req->rq_interpret_reply = request->rq_interpret_reply;
1949         new_req->rq_async_args = request->rq_async_args;
1950         new_req->rq_commit_cb = request->rq_commit_cb;
1951         /* cap resend delay to the current request timeout, this is similar to
1952          * what ptlrpc does (see after_reply()) */
1953         if (aa->aa_resends > new_req->rq_timeout)
1954                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1955         else
1956                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1957         new_req->rq_generation_set = 1;
1958         new_req->rq_import_generation = request->rq_import_generation;
1959
1960         new_aa = ptlrpc_req_async_args(new_req);
1961
1962         INIT_LIST_HEAD(&new_aa->aa_oaps);
1963         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1964         INIT_LIST_HEAD(&new_aa->aa_exts);
1965         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1966         new_aa->aa_resends = aa->aa_resends;
1967
1968         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1969                 if (oap->oap_request) {
1970                         ptlrpc_req_finished(oap->oap_request);
1971                         oap->oap_request = ptlrpc_request_addref(new_req);
1972                 }
1973         }
1974
1975         /* XXX: This code will run into problem if we're going to support
1976          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1977          * and wait for all of them to be finished. We should inherit request
1978          * set from old request. */
1979         ptlrpcd_add_req(new_req);
1980
1981         DEBUG_REQ(D_INFO, new_req, "new request");
1982         RETURN(0);
1983 }
1984
1985 /*
1986  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1987  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1988  * fine for our small page arrays and doesn't require allocation.  its an
1989  * insertion sort that swaps elements that are strides apart, shrinking the
1990  * stride down until its '1' and the array is sorted.
1991  */
1992 static void sort_brw_pages(struct brw_page **array, int num)
1993 {
1994         int stride, i, j;
1995         struct brw_page *tmp;
1996
1997         if (num == 1)
1998                 return;
1999         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2000                 ;
2001
2002         do {
2003                 stride /= 3;
2004                 for (i = stride ; i < num ; i++) {
2005                         tmp = array[i];
2006                         j = i;
2007                         while (j >= stride && array[j - stride]->off > tmp->off) {
2008                                 array[j] = array[j - stride];
2009                                 j -= stride;
2010                         }
2011                         array[j] = tmp;
2012                 }
2013         } while (stride > 1);
2014 }
2015
2016 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2017 {
2018         LASSERT(ppga != NULL);
2019         OBD_FREE(ppga, sizeof(*ppga) * count);
2020 }
2021
2022 static int brw_interpret(const struct lu_env *env,
2023                          struct ptlrpc_request *req, void *args, int rc)
2024 {
2025         struct osc_brw_async_args *aa = args;
2026         struct osc_extent *ext;
2027         struct osc_extent *tmp;
2028         struct client_obd *cli = aa->aa_cli;
2029         unsigned long transferred = 0;
2030
2031         ENTRY;
2032
2033         rc = osc_brw_fini_request(req, rc);
2034         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2035         /*
2036          * When server returns -EINPROGRESS, client should always retry
2037          * regardless of the number of times the bulk was resent already.
2038          */
2039         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2040                 if (req->rq_import_generation !=
2041                     req->rq_import->imp_generation) {
2042                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2043                                ""DOSTID", rc = %d.\n",
2044                                req->rq_import->imp_obd->obd_name,
2045                                POSTID(&aa->aa_oa->o_oi), rc);
2046                 } else if (rc == -EINPROGRESS ||
2047                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2048                         rc = osc_brw_redo_request(req, aa, rc);
2049                 } else {
2050                         CERROR("%s: too many resent retries for object: "
2051                                "%llu:%llu, rc = %d.\n",
2052                                req->rq_import->imp_obd->obd_name,
2053                                POSTID(&aa->aa_oa->o_oi), rc);
2054                 }
2055
2056                 if (rc == 0)
2057                         RETURN(0);
2058                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2059                         rc = -EIO;
2060         }
2061
2062         if (rc == 0) {
2063                 struct obdo *oa = aa->aa_oa;
2064                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2065                 unsigned long valid = 0;
2066                 struct cl_object *obj;
2067                 struct osc_async_page *last;
2068
2069                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2070                 obj = osc2cl(last->oap_obj);
2071
2072                 cl_object_attr_lock(obj);
2073                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2074                         attr->cat_blocks = oa->o_blocks;
2075                         valid |= CAT_BLOCKS;
2076                 }
2077                 if (oa->o_valid & OBD_MD_FLMTIME) {
2078                         attr->cat_mtime = oa->o_mtime;
2079                         valid |= CAT_MTIME;
2080                 }
2081                 if (oa->o_valid & OBD_MD_FLATIME) {
2082                         attr->cat_atime = oa->o_atime;
2083                         valid |= CAT_ATIME;
2084                 }
2085                 if (oa->o_valid & OBD_MD_FLCTIME) {
2086                         attr->cat_ctime = oa->o_ctime;
2087                         valid |= CAT_CTIME;
2088                 }
2089
2090                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2091                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2092                         loff_t last_off = last->oap_count + last->oap_obj_off +
2093                                 last->oap_page_off;
2094
2095                         /* Change file size if this is an out of quota or
2096                          * direct IO write and it extends the file size */
2097                         if (loi->loi_lvb.lvb_size < last_off) {
2098                                 attr->cat_size = last_off;
2099                                 valid |= CAT_SIZE;
2100                         }
2101                         /* Extend KMS if it's not a lockless write */
2102                         if (loi->loi_kms < last_off &&
2103                             oap2osc_page(last)->ops_srvlock == 0) {
2104                                 attr->cat_kms = last_off;
2105                                 valid |= CAT_KMS;
2106                         }
2107                 }
2108
2109                 if (valid != 0)
2110                         cl_object_attr_update(env, obj, attr, valid);
2111                 cl_object_attr_unlock(obj);
2112         }
2113         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2114
2115         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2116                 osc_inc_unstable_pages(req);
2117
2118         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2119                 list_del_init(&ext->oe_link);
2120                 osc_extent_finish(env, ext, 1,
2121                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2122         }
2123         LASSERT(list_empty(&aa->aa_exts));
2124         LASSERT(list_empty(&aa->aa_oaps));
2125
2126         transferred = (req->rq_bulk == NULL ? /* short io */
2127                        aa->aa_requested_nob :
2128                        req->rq_bulk->bd_nob_transferred);
2129
2130         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2131         ptlrpc_lprocfs_brw(req, transferred);
2132
2133         spin_lock(&cli->cl_loi_list_lock);
2134         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2135          * is called so we know whether to go to sync BRWs or wait for more
2136          * RPCs to complete */
2137         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2138                 cli->cl_w_in_flight--;
2139         else
2140                 cli->cl_r_in_flight--;
2141         osc_wake_cache_waiters(cli);
2142         spin_unlock(&cli->cl_loi_list_lock);
2143
2144         osc_io_unplug(env, cli, NULL);
2145         RETURN(rc);
2146 }
2147
2148 static void brw_commit(struct ptlrpc_request *req)
2149 {
2150         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2151          * this called via the rq_commit_cb, I need to ensure
2152          * osc_dec_unstable_pages is still called. Otherwise unstable
2153          * pages may be leaked. */
2154         spin_lock(&req->rq_lock);
2155         if (likely(req->rq_unstable)) {
2156                 req->rq_unstable = 0;
2157                 spin_unlock(&req->rq_lock);
2158
2159                 osc_dec_unstable_pages(req);
2160         } else {
2161                 req->rq_committed = 1;
2162                 spin_unlock(&req->rq_lock);
2163         }
2164 }
2165
2166 /**
2167  * Build an RPC by the list of extent @ext_list. The caller must ensure
2168  * that the total pages in this list are NOT over max pages per RPC.
2169  * Extents in the list must be in OES_RPC state.
2170  */
2171 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2172                   struct list_head *ext_list, int cmd)
2173 {
2174         struct ptlrpc_request           *req = NULL;
2175         struct osc_extent               *ext;
2176         struct brw_page                 **pga = NULL;
2177         struct osc_brw_async_args       *aa = NULL;
2178         struct obdo                     *oa = NULL;
2179         struct osc_async_page           *oap;
2180         struct osc_object               *obj = NULL;
2181         struct cl_req_attr              *crattr = NULL;
2182         loff_t                          starting_offset = OBD_OBJECT_EOF;
2183         loff_t                          ending_offset = 0;
2184         int                             mpflag = 0;
2185         int                             mem_tight = 0;
2186         int                             page_count = 0;
2187         bool                            soft_sync = false;
2188         bool                            interrupted = false;
2189         bool                            ndelay = false;
2190         int                             i;
2191         int                             grant = 0;
2192         int                             rc;
2193         __u32                           layout_version = 0;
2194         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2195         struct ost_body                 *body;
2196         ENTRY;
2197         LASSERT(!list_empty(ext_list));
2198
2199         /* add pages into rpc_list to build BRW rpc */
2200         list_for_each_entry(ext, ext_list, oe_link) {
2201                 LASSERT(ext->oe_state == OES_RPC);
2202                 mem_tight |= ext->oe_memalloc;
2203                 grant += ext->oe_grants;
2204                 page_count += ext->oe_nr_pages;
2205                 layout_version = MAX(layout_version, ext->oe_layout_version);
2206                 if (obj == NULL)
2207                         obj = ext->oe_obj;
2208         }
2209
2210         soft_sync = osc_over_unstable_soft_limit(cli);
2211         if (mem_tight)
2212                 mpflag = cfs_memory_pressure_get_and_set();
2213
2214         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2215         if (pga == NULL)
2216                 GOTO(out, rc = -ENOMEM);
2217
2218         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2219         if (oa == NULL)
2220                 GOTO(out, rc = -ENOMEM);
2221
2222         i = 0;
2223         list_for_each_entry(ext, ext_list, oe_link) {
2224                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2225                         if (mem_tight)
2226                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2227                         if (soft_sync)
2228                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2229                         pga[i] = &oap->oap_brw_page;
2230                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2231                         i++;
2232
2233                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2234                         if (starting_offset == OBD_OBJECT_EOF ||
2235                             starting_offset > oap->oap_obj_off)
2236                                 starting_offset = oap->oap_obj_off;
2237                         else
2238                                 LASSERT(oap->oap_page_off == 0);
2239                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2240                                 ending_offset = oap->oap_obj_off +
2241                                                 oap->oap_count;
2242                         else
2243                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2244                                         PAGE_SIZE);
2245                         if (oap->oap_interrupted)
2246                                 interrupted = true;
2247                 }
2248                 if (ext->oe_ndelay)
2249                         ndelay = true;
2250         }
2251
2252         /* first page in the list */
2253         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2254
2255         crattr = &osc_env_info(env)->oti_req_attr;
2256         memset(crattr, 0, sizeof(*crattr));
2257         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2258         crattr->cra_flags = ~0ULL;
2259         crattr->cra_page = oap2cl_page(oap);
2260         crattr->cra_oa = oa;
2261         cl_req_attr_set(env, osc2cl(obj), crattr);
2262
2263         if (cmd == OBD_BRW_WRITE) {
2264                 oa->o_grant_used = grant;
2265                 if (layout_version > 0) {
2266                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2267                                PFID(&oa->o_oi.oi_fid), layout_version);
2268
2269                         oa->o_layout_version = layout_version;
2270                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2271                 }
2272         }
2273
2274         sort_brw_pages(pga, page_count);
2275         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2276         if (rc != 0) {
2277                 CERROR("prep_req failed: %d\n", rc);
2278                 GOTO(out, rc);
2279         }
2280
2281         req->rq_commit_cb = brw_commit;
2282         req->rq_interpret_reply = brw_interpret;
2283         req->rq_memalloc = mem_tight != 0;
2284         oap->oap_request = ptlrpc_request_addref(req);
2285         if (interrupted && !req->rq_intr)
2286                 ptlrpc_mark_interrupted(req);
2287         if (ndelay) {
2288                 req->rq_no_resend = req->rq_no_delay = 1;
2289                 /* probably set a shorter timeout value.
2290                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2291                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2292         }
2293
2294         /* Need to update the timestamps after the request is built in case
2295          * we race with setattr (locally or in queue at OST).  If OST gets
2296          * later setattr before earlier BRW (as determined by the request xid),
2297          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2298          * way to do this in a single call.  bug 10150 */
2299         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2300         crattr->cra_oa = &body->oa;
2301         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2302         cl_req_attr_set(env, osc2cl(obj), crattr);
2303         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2304
2305         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2306         aa = ptlrpc_req_async_args(req);
2307         INIT_LIST_HEAD(&aa->aa_oaps);
2308         list_splice_init(&rpc_list, &aa->aa_oaps);
2309         INIT_LIST_HEAD(&aa->aa_exts);
2310         list_splice_init(ext_list, &aa->aa_exts);
2311
2312         spin_lock(&cli->cl_loi_list_lock);
2313         starting_offset >>= PAGE_SHIFT;
2314         if (cmd == OBD_BRW_READ) {
2315                 cli->cl_r_in_flight++;
2316                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2317                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2318                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2319                                       starting_offset + 1);
2320         } else {
2321                 cli->cl_w_in_flight++;
2322                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2323                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2324                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2325                                       starting_offset + 1);
2326         }
2327         spin_unlock(&cli->cl_loi_list_lock);
2328
2329         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2330                   page_count, aa, cli->cl_r_in_flight,
2331                   cli->cl_w_in_flight);
2332         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2333
2334         ptlrpcd_add_req(req);
2335         rc = 0;
2336         EXIT;
2337
2338 out:
2339         if (mem_tight != 0)
2340                 cfs_memory_pressure_restore(mpflag);
2341
2342         if (rc != 0) {
2343                 LASSERT(req == NULL);
2344
2345                 if (oa)
2346                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2347                 if (pga)
2348                         OBD_FREE(pga, sizeof(*pga) * page_count);
2349                 /* this should happen rarely and is pretty bad, it makes the
2350                  * pending list not follow the dirty order */
2351                 while (!list_empty(ext_list)) {
2352                         ext = list_entry(ext_list->next, struct osc_extent,
2353                                          oe_link);
2354                         list_del_init(&ext->oe_link);
2355                         osc_extent_finish(env, ext, 0, rc);
2356                 }
2357         }
2358         RETURN(rc);
2359 }
2360
2361 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2362 {
2363         int set = 0;
2364
2365         LASSERT(lock != NULL);
2366
2367         lock_res_and_lock(lock);
2368
2369         if (lock->l_ast_data == NULL)
2370                 lock->l_ast_data = data;
2371         if (lock->l_ast_data == data)
2372                 set = 1;
2373
2374         unlock_res_and_lock(lock);
2375
2376         return set;
2377 }
2378
2379 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2380                      void *cookie, struct lustre_handle *lockh,
2381                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2382                      int errcode)
2383 {
2384         bool intent = *flags & LDLM_FL_HAS_INTENT;
2385         int rc;
2386         ENTRY;
2387
2388         /* The request was created before ldlm_cli_enqueue call. */
2389         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2390                 struct ldlm_reply *rep;
2391
2392                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2393                 LASSERT(rep != NULL);
2394
2395                 rep->lock_policy_res1 =
2396                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2397                 if (rep->lock_policy_res1)
2398                         errcode = rep->lock_policy_res1;
2399                 if (!speculative)
2400                         *flags |= LDLM_FL_LVB_READY;
2401         } else if (errcode == ELDLM_OK) {
2402                 *flags |= LDLM_FL_LVB_READY;
2403         }
2404
2405         /* Call the update callback. */
2406         rc = (*upcall)(cookie, lockh, errcode);
2407
2408         /* release the reference taken in ldlm_cli_enqueue() */
2409         if (errcode == ELDLM_LOCK_MATCHED)
2410                 errcode = ELDLM_OK;
2411         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2412                 ldlm_lock_decref(lockh, mode);
2413
2414         RETURN(rc);
2415 }
2416
2417 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2418                           void *args, int rc)
2419 {
2420         struct osc_enqueue_args *aa = args;
2421         struct ldlm_lock *lock;
2422         struct lustre_handle *lockh = &aa->oa_lockh;
2423         enum ldlm_mode mode = aa->oa_mode;
2424         struct ost_lvb *lvb = aa->oa_lvb;
2425         __u32 lvb_len = sizeof(*lvb);
2426         __u64 flags = 0;
2427
2428         ENTRY;
2429
2430         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2431          * be valid. */
2432         lock = ldlm_handle2lock(lockh);
2433         LASSERTF(lock != NULL,
2434                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2435                  lockh->cookie, req, aa);
2436
2437         /* Take an additional reference so that a blocking AST that
2438          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2439          * to arrive after an upcall has been executed by
2440          * osc_enqueue_fini(). */
2441         ldlm_lock_addref(lockh, mode);
2442
2443         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2444         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2445
2446         /* Let CP AST to grant the lock first. */
2447         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2448
2449         if (aa->oa_speculative) {
2450                 LASSERT(aa->oa_lvb == NULL);
2451                 LASSERT(aa->oa_flags == NULL);
2452                 aa->oa_flags = &flags;
2453         }
2454
2455         /* Complete obtaining the lock procedure. */
2456         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2457                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2458                                    lockh, rc);
2459         /* Complete osc stuff. */
2460         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2461                               aa->oa_flags, aa->oa_speculative, rc);
2462
2463         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2464
2465         ldlm_lock_decref(lockh, mode);
2466         LDLM_LOCK_PUT(lock);
2467         RETURN(rc);
2468 }
2469
2470 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2471
2472 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2473  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2474  * other synchronous requests, however keeping some locks and trying to obtain
2475  * others may take a considerable amount of time in a case of ost failure; and
2476  * when other sync requests do not get released lock from a client, the client
2477  * is evicted from the cluster -- such scenarious make the life difficult, so
2478  * release locks just after they are obtained. */
2479 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2480                      __u64 *flags, union ldlm_policy_data *policy,
2481                      struct ost_lvb *lvb, int kms_valid,
2482                      osc_enqueue_upcall_f upcall, void *cookie,
2483                      struct ldlm_enqueue_info *einfo,
2484                      struct ptlrpc_request_set *rqset, int async,
2485                      bool speculative)
2486 {
2487         struct obd_device *obd = exp->exp_obd;
2488         struct lustre_handle lockh = { 0 };
2489         struct ptlrpc_request *req = NULL;
2490         int intent = *flags & LDLM_FL_HAS_INTENT;
2491         __u64 match_flags = *flags;
2492         enum ldlm_mode mode;
2493         int rc;
2494         ENTRY;
2495
2496         /* Filesystem lock extents are extended to page boundaries so that
2497          * dealing with the page cache is a little smoother.  */
2498         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2499         policy->l_extent.end |= ~PAGE_MASK;
2500
2501         /*
2502          * kms is not valid when either object is completely fresh (so that no
2503          * locks are cached), or object was evicted. In the latter case cached
2504          * lock cannot be used, because it would prime inode state with
2505          * potentially stale LVB.
2506          */
2507         if (!kms_valid)
2508                 goto no_match;
2509
2510         /* Next, search for already existing extent locks that will cover us */
2511         /* If we're trying to read, we also search for an existing PW lock.  The
2512          * VFS and page cache already protect us locally, so lots of readers/
2513          * writers can share a single PW lock.
2514          *
2515          * There are problems with conversion deadlocks, so instead of
2516          * converting a read lock to a write lock, we'll just enqueue a new
2517          * one.
2518          *
2519          * At some point we should cancel the read lock instead of making them
2520          * send us a blocking callback, but there are problems with canceling
2521          * locks out from other users right now, too. */
2522         mode = einfo->ei_mode;
2523         if (einfo->ei_mode == LCK_PR)
2524                 mode |= LCK_PW;
2525         /* Normal lock requests must wait for the LVB to be ready before
2526          * matching a lock; speculative lock requests do not need to,
2527          * because they will not actually use the lock. */
2528         if (!speculative)
2529                 match_flags |= LDLM_FL_LVB_READY;
2530         if (intent != 0)
2531                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2532         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2533                                einfo->ei_type, policy, mode, &lockh, 0);
2534         if (mode) {
2535                 struct ldlm_lock *matched;
2536
2537                 if (*flags & LDLM_FL_TEST_LOCK)
2538                         RETURN(ELDLM_OK);
2539
2540                 matched = ldlm_handle2lock(&lockh);
2541                 if (speculative) {
2542                         /* This DLM lock request is speculative, and does not
2543                          * have an associated IO request. Therefore if there
2544                          * is already a DLM lock, it wll just inform the
2545                          * caller to cancel the request for this stripe.*/
2546                         lock_res_and_lock(matched);
2547                         if (ldlm_extent_equal(&policy->l_extent,
2548                             &matched->l_policy_data.l_extent))
2549                                 rc = -EEXIST;
2550                         else
2551                                 rc = -ECANCELED;
2552                         unlock_res_and_lock(matched);
2553
2554                         ldlm_lock_decref(&lockh, mode);
2555                         LDLM_LOCK_PUT(matched);
2556                         RETURN(rc);
2557                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2558                         *flags |= LDLM_FL_LVB_READY;
2559
2560                         /* We already have a lock, and it's referenced. */
2561                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2562
2563                         ldlm_lock_decref(&lockh, mode);
2564                         LDLM_LOCK_PUT(matched);
2565                         RETURN(ELDLM_OK);
2566                 } else {
2567                         ldlm_lock_decref(&lockh, mode);
2568                         LDLM_LOCK_PUT(matched);
2569                 }
2570         }
2571
2572 no_match:
2573         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2574                 RETURN(-ENOLCK);
2575
2576         if (intent) {
2577                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2578                                            &RQF_LDLM_ENQUEUE_LVB);
2579                 if (req == NULL)
2580                         RETURN(-ENOMEM);
2581
2582                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2583                 if (rc) {
2584                         ptlrpc_request_free(req);
2585                         RETURN(rc);
2586                 }
2587
2588                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2589                                      sizeof *lvb);
2590                 ptlrpc_request_set_replen(req);
2591         }
2592
2593         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2594         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2595
2596         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2597                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2598         if (async) {
2599                 if (!rc) {
2600                         struct osc_enqueue_args *aa;
2601                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2602                         aa = ptlrpc_req_async_args(req);
2603                         aa->oa_exp         = exp;
2604                         aa->oa_mode        = einfo->ei_mode;
2605                         aa->oa_type        = einfo->ei_type;
2606                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2607                         aa->oa_upcall      = upcall;
2608                         aa->oa_cookie      = cookie;
2609                         aa->oa_speculative = speculative;
2610                         if (!speculative) {
2611                                 aa->oa_flags  = flags;
2612                                 aa->oa_lvb    = lvb;
2613                         } else {
2614                                 /* speculative locks are essentially to enqueue
2615                                  * a DLM lock  in advance, so we don't care
2616                                  * about the result of the enqueue. */
2617                                 aa->oa_lvb    = NULL;
2618                                 aa->oa_flags  = NULL;
2619                         }
2620
2621                         req->rq_interpret_reply = osc_enqueue_interpret;
2622                         if (rqset == PTLRPCD_SET)
2623                                 ptlrpcd_add_req(req);
2624                         else
2625                                 ptlrpc_set_add_req(rqset, req);
2626                 } else if (intent) {
2627                         ptlrpc_req_finished(req);
2628                 }
2629                 RETURN(rc);
2630         }
2631
2632         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2633                               flags, speculative, rc);
2634         if (intent)
2635                 ptlrpc_req_finished(req);
2636
2637         RETURN(rc);
2638 }
2639
2640 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2641                    enum ldlm_type type, union ldlm_policy_data *policy,
2642                    enum ldlm_mode mode, __u64 *flags, void *data,
2643                    struct lustre_handle *lockh, int unref)
2644 {
2645         struct obd_device *obd = exp->exp_obd;
2646         __u64 lflags = *flags;
2647         enum ldlm_mode rc;
2648         ENTRY;
2649
2650         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2651                 RETURN(-EIO);
2652
2653         /* Filesystem lock extents are extended to page boundaries so that
2654          * dealing with the page cache is a little smoother */
2655         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2656         policy->l_extent.end |= ~PAGE_MASK;
2657
2658         /* Next, search for already existing extent locks that will cover us */
2659         /* If we're trying to read, we also search for an existing PW lock.  The
2660          * VFS and page cache already protect us locally, so lots of readers/
2661          * writers can share a single PW lock. */
2662         rc = mode;
2663         if (mode == LCK_PR)
2664                 rc |= LCK_PW;
2665         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2666                              res_id, type, policy, rc, lockh, unref);
2667         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2668                 RETURN(rc);
2669
2670         if (data != NULL) {
2671                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2672
2673                 LASSERT(lock != NULL);
2674                 if (!osc_set_lock_data(lock, data)) {
2675                         ldlm_lock_decref(lockh, rc);
2676                         rc = 0;
2677                 }
2678                 LDLM_LOCK_PUT(lock);
2679         }
2680         RETURN(rc);
2681 }
2682
2683 static int osc_statfs_interpret(const struct lu_env *env,
2684                                 struct ptlrpc_request *req, void *args, int rc)
2685 {
2686         struct osc_async_args *aa = args;
2687         struct obd_statfs *msfs;
2688
2689         ENTRY;
2690         if (rc == -EBADR)
2691                 /*
2692                  * The request has in fact never been sent due to issues at
2693                  * a higher level (LOV).  Exit immediately since the caller
2694                  * is aware of the problem and takes care of the clean up.
2695                  */
2696                 RETURN(rc);
2697
2698         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2699             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2700                 GOTO(out, rc = 0);
2701
2702         if (rc != 0)
2703                 GOTO(out, rc);
2704
2705         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2706         if (msfs == NULL)
2707                 GOTO(out, rc = -EPROTO);
2708
2709         *aa->aa_oi->oi_osfs = *msfs;
2710 out:
2711         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2712
2713         RETURN(rc);
2714 }
2715
2716 static int osc_statfs_async(struct obd_export *exp,
2717                             struct obd_info *oinfo, time64_t max_age,
2718                             struct ptlrpc_request_set *rqset)
2719 {
2720         struct obd_device     *obd = class_exp2obd(exp);
2721         struct ptlrpc_request *req;
2722         struct osc_async_args *aa;
2723         int rc;
2724         ENTRY;
2725
2726         /* We could possibly pass max_age in the request (as an absolute
2727          * timestamp or a "seconds.usec ago") so the target can avoid doing
2728          * extra calls into the filesystem if that isn't necessary (e.g.
2729          * during mount that would help a bit).  Having relative timestamps
2730          * is not so great if request processing is slow, while absolute
2731          * timestamps are not ideal because they need time synchronization. */
2732         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2733         if (req == NULL)
2734                 RETURN(-ENOMEM);
2735
2736         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2737         if (rc) {
2738                 ptlrpc_request_free(req);
2739                 RETURN(rc);
2740         }
2741         ptlrpc_request_set_replen(req);
2742         req->rq_request_portal = OST_CREATE_PORTAL;
2743         ptlrpc_at_set_req_timeout(req);
2744
2745         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2746                 /* procfs requests not want stat in wait for avoid deadlock */
2747                 req->rq_no_resend = 1;
2748                 req->rq_no_delay = 1;
2749         }
2750
2751         req->rq_interpret_reply = osc_statfs_interpret;
2752         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2753         aa = ptlrpc_req_async_args(req);
2754         aa->aa_oi = oinfo;
2755
2756         ptlrpc_set_add_req(rqset, req);
2757         RETURN(0);
2758 }
2759
2760 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2761                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2762 {
2763         struct obd_device     *obd = class_exp2obd(exp);
2764         struct obd_statfs     *msfs;
2765         struct ptlrpc_request *req;
2766         struct obd_import     *imp = NULL;
2767         int rc;
2768         ENTRY;
2769
2770
2771         /*Since the request might also come from lprocfs, so we need
2772          *sync this with client_disconnect_export Bug15684*/
2773         down_read(&obd->u.cli.cl_sem);
2774         if (obd->u.cli.cl_import)
2775                 imp = class_import_get(obd->u.cli.cl_import);
2776         up_read(&obd->u.cli.cl_sem);
2777         if (!imp)
2778                 RETURN(-ENODEV);
2779
2780         /* We could possibly pass max_age in the request (as an absolute
2781          * timestamp or a "seconds.usec ago") so the target can avoid doing
2782          * extra calls into the filesystem if that isn't necessary (e.g.
2783          * during mount that would help a bit).  Having relative timestamps
2784          * is not so great if request processing is slow, while absolute
2785          * timestamps are not ideal because they need time synchronization. */
2786         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2787
2788         class_import_put(imp);
2789
2790         if (req == NULL)
2791                 RETURN(-ENOMEM);
2792
2793         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2794         if (rc) {
2795                 ptlrpc_request_free(req);
2796                 RETURN(rc);
2797         }
2798         ptlrpc_request_set_replen(req);
2799         req->rq_request_portal = OST_CREATE_PORTAL;
2800         ptlrpc_at_set_req_timeout(req);
2801
2802         if (flags & OBD_STATFS_NODELAY) {
2803                 /* procfs requests not want stat in wait for avoid deadlock */
2804                 req->rq_no_resend = 1;
2805                 req->rq_no_delay = 1;
2806         }
2807
2808         rc = ptlrpc_queue_wait(req);
2809         if (rc)
2810                 GOTO(out, rc);
2811
2812         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2813         if (msfs == NULL)
2814                 GOTO(out, rc = -EPROTO);
2815
2816         *osfs = *msfs;
2817
2818         EXIT;
2819 out:
2820         ptlrpc_req_finished(req);
2821         return rc;
2822 }
2823
2824 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2825                          void *karg, void __user *uarg)
2826 {
2827         struct obd_device *obd = exp->exp_obd;
2828         struct obd_ioctl_data *data = karg;
2829         int err = 0;
2830         ENTRY;
2831
2832         if (!try_module_get(THIS_MODULE)) {
2833                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2834                        module_name(THIS_MODULE));
2835                 return -EINVAL;
2836         }
2837         switch (cmd) {
2838         case OBD_IOC_CLIENT_RECOVER:
2839                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2840                                             data->ioc_inlbuf1, 0);
2841                 if (err > 0)
2842                         err = 0;
2843                 GOTO(out, err);
2844         case IOC_OSC_SET_ACTIVE:
2845                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2846                                                data->ioc_offset);
2847                 GOTO(out, err);
2848         case OBD_IOC_PING_TARGET:
2849                 err = ptlrpc_obd_ping(obd);
2850                 GOTO(out, err);
2851         default:
2852                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2853                        cmd, current_comm());
2854                 GOTO(out, err = -ENOTTY);
2855         }
2856 out:
2857         module_put(THIS_MODULE);
2858         return err;
2859 }
2860
2861 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2862                        u32 keylen, void *key, u32 vallen, void *val,
2863                        struct ptlrpc_request_set *set)
2864 {
2865         struct ptlrpc_request *req;
2866         struct obd_device     *obd = exp->exp_obd;
2867         struct obd_import     *imp = class_exp2cliimp(exp);
2868         char                  *tmp;
2869         int                    rc;
2870         ENTRY;
2871
2872         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2873
2874         if (KEY_IS(KEY_CHECKSUM)) {
2875                 if (vallen != sizeof(int))
2876                         RETURN(-EINVAL);
2877                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2878                 RETURN(0);
2879         }
2880
2881         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2882                 sptlrpc_conf_client_adapt(obd);
2883                 RETURN(0);
2884         }
2885
2886         if (KEY_IS(KEY_FLUSH_CTX)) {
2887                 sptlrpc_import_flush_my_ctx(imp);
2888                 RETURN(0);
2889         }
2890
2891         if (KEY_IS(KEY_CACHE_SET)) {
2892                 struct client_obd *cli = &obd->u.cli;
2893
2894                 LASSERT(cli->cl_cache == NULL); /* only once */
2895                 cli->cl_cache = (struct cl_client_cache *)val;
2896                 cl_cache_incref(cli->cl_cache);
2897                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2898
2899                 /* add this osc into entity list */
2900                 LASSERT(list_empty(&cli->cl_lru_osc));
2901                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2902                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2903                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2904
2905                 RETURN(0);
2906         }
2907
2908         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2909                 struct client_obd *cli = &obd->u.cli;
2910                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2911                 long target = *(long *)val;
2912
2913                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2914                 *(long *)val -= nr;
2915                 RETURN(0);
2916         }
2917
2918         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2919                 RETURN(-EINVAL);
2920
2921         /* We pass all other commands directly to OST. Since nobody calls osc
2922            methods directly and everybody is supposed to go through LOV, we
2923            assume lov checked invalid values for us.
2924            The only recognised values so far are evict_by_nid and mds_conn.
2925            Even if something bad goes through, we'd get a -EINVAL from OST
2926            anyway. */
2927
2928         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2929                                                 &RQF_OST_SET_GRANT_INFO :
2930                                                 &RQF_OBD_SET_INFO);
2931         if (req == NULL)
2932                 RETURN(-ENOMEM);
2933
2934         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2935                              RCL_CLIENT, keylen);
2936         if (!KEY_IS(KEY_GRANT_SHRINK))
2937                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2938                                      RCL_CLIENT, vallen);
2939         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2940         if (rc) {
2941                 ptlrpc_request_free(req);
2942                 RETURN(rc);
2943         }
2944
2945         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2946         memcpy(tmp, key, keylen);
2947         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2948                                                         &RMF_OST_BODY :
2949                                                         &RMF_SETINFO_VAL);
2950         memcpy(tmp, val, vallen);
2951
2952         if (KEY_IS(KEY_GRANT_SHRINK)) {
2953                 struct osc_grant_args *aa;
2954                 struct obdo *oa;
2955
2956                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2957                 aa = ptlrpc_req_async_args(req);
2958                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2959                 if (!oa) {
2960                         ptlrpc_req_finished(req);
2961                         RETURN(-ENOMEM);
2962                 }
2963                 *oa = ((struct ost_body *)val)->oa;
2964                 aa->aa_oa = oa;
2965                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2966         }
2967
2968         ptlrpc_request_set_replen(req);
2969         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2970                 LASSERT(set != NULL);
2971                 ptlrpc_set_add_req(set, req);
2972                 ptlrpc_check_set(NULL, set);
2973         } else {
2974                 ptlrpcd_add_req(req);
2975         }
2976
2977         RETURN(0);
2978 }
2979 EXPORT_SYMBOL(osc_set_info_async);
2980
2981 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2982                   struct obd_device *obd, struct obd_uuid *cluuid,
2983                   struct obd_connect_data *data, void *localdata)
2984 {
2985         struct client_obd *cli = &obd->u.cli;
2986
2987         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2988                 long lost_grant;
2989                 long grant;
2990
2991                 spin_lock(&cli->cl_loi_list_lock);
2992                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2993                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2994                         grant += cli->cl_dirty_grant;
2995                 else
2996                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2997                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2998                 lost_grant = cli->cl_lost_grant;
2999                 cli->cl_lost_grant = 0;
3000                 spin_unlock(&cli->cl_loi_list_lock);
3001
3002                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3003                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3004                        data->ocd_version, data->ocd_grant, lost_grant);
3005         }
3006
3007         RETURN(0);
3008 }
3009 EXPORT_SYMBOL(osc_reconnect);
3010
3011 int osc_disconnect(struct obd_export *exp)
3012 {
3013         struct obd_device *obd = class_exp2obd(exp);
3014         int rc;
3015
3016         rc = client_disconnect_export(exp);
3017         /**
3018          * Initially we put del_shrink_grant before disconnect_export, but it
3019          * causes the following problem if setup (connect) and cleanup
3020          * (disconnect) are tangled together.
3021          *      connect p1                     disconnect p2
3022          *   ptlrpc_connect_import
3023          *     ...............               class_manual_cleanup
3024          *                                     osc_disconnect
3025          *                                     del_shrink_grant
3026          *   ptlrpc_connect_interrupt
3027          *     osc_init_grant
3028          *   add this client to shrink list
3029          *                                      cleanup_osc
3030          * Bang! grant shrink thread trigger the shrink. BUG18662
3031          */
3032         osc_del_grant_list(&obd->u.cli);
3033         return rc;
3034 }
3035 EXPORT_SYMBOL(osc_disconnect);
3036
3037 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3038                                  struct hlist_node *hnode, void *arg)
3039 {
3040         struct lu_env *env = arg;
3041         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3042         struct ldlm_lock *lock;
3043         struct osc_object *osc = NULL;
3044         ENTRY;
3045
3046         lock_res(res);
3047         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3048                 if (lock->l_ast_data != NULL && osc == NULL) {
3049                         osc = lock->l_ast_data;
3050                         cl_object_get(osc2cl(osc));
3051                 }
3052
3053                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3054                  * by the 2nd round of ldlm_namespace_clean() call in
3055                  * osc_import_event(). */
3056                 ldlm_clear_cleaned(lock);
3057         }
3058         unlock_res(res);
3059
3060         if (osc != NULL) {
3061                 osc_object_invalidate(env, osc);
3062                 cl_object_put(env, osc2cl(osc));
3063         }
3064
3065         RETURN(0);
3066 }
3067 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3068
3069 static int osc_import_event(struct obd_device *obd,
3070                             struct obd_import *imp,
3071                             enum obd_import_event event)
3072 {
3073         struct client_obd *cli;
3074         int rc = 0;
3075
3076         ENTRY;
3077         LASSERT(imp->imp_obd == obd);
3078
3079         switch (event) {
3080         case IMP_EVENT_DISCON: {
3081                 cli = &obd->u.cli;
3082                 spin_lock(&cli->cl_loi_list_lock);
3083                 cli->cl_avail_grant = 0;
3084                 cli->cl_lost_grant = 0;
3085                 spin_unlock(&cli->cl_loi_list_lock);
3086                 break;
3087         }
3088         case IMP_EVENT_INACTIVE: {
3089                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3090                 break;
3091         }
3092         case IMP_EVENT_INVALIDATE: {
3093                 struct ldlm_namespace *ns = obd->obd_namespace;
3094                 struct lu_env         *env;
3095                 __u16                  refcheck;
3096
3097                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3098
3099                 env = cl_env_get(&refcheck);
3100                 if (!IS_ERR(env)) {
3101                         osc_io_unplug(env, &obd->u.cli, NULL);
3102
3103                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3104                                                  osc_ldlm_resource_invalidate,
3105                                                  env, 0);
3106                         cl_env_put(env, &refcheck);
3107
3108                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3109                 } else
3110                         rc = PTR_ERR(env);
3111                 break;
3112         }
3113         case IMP_EVENT_ACTIVE: {
3114                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3115                 break;
3116         }
3117         case IMP_EVENT_OCD: {
3118                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3119
3120                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3121                         osc_init_grant(&obd->u.cli, ocd);
3122
3123                 /* See bug 7198 */
3124                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3125                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3126
3127                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3128                 break;
3129         }
3130         case IMP_EVENT_DEACTIVATE: {
3131                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3132                 break;
3133         }
3134         case IMP_EVENT_ACTIVATE: {
3135                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3136                 break;
3137         }
3138         default:
3139                 CERROR("Unknown import event %d\n", event);
3140                 LBUG();
3141         }
3142         RETURN(rc);
3143 }
3144
3145 /**
3146  * Determine whether the lock can be canceled before replaying the lock
3147  * during recovery, see bug16774 for detailed information.
3148  *
3149  * \retval zero the lock can't be canceled
3150  * \retval other ok to cancel
3151  */
3152 static int osc_cancel_weight(struct ldlm_lock *lock)
3153 {
3154         /*
3155          * Cancel all unused and granted extent lock.
3156          */
3157         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3158             lock->l_granted_mode == lock->l_req_mode &&
3159             osc_ldlm_weigh_ast(lock) == 0)
3160                 RETURN(1);
3161
3162         RETURN(0);
3163 }
3164
3165 static int brw_queue_work(const struct lu_env *env, void *data)
3166 {
3167         struct client_obd *cli = data;
3168
3169         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3170
3171         osc_io_unplug(env, cli, NULL);
3172         RETURN(0);
3173 }
3174
3175 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3176 {
3177         struct client_obd *cli = &obd->u.cli;
3178         void *handler;
3179         int rc;
3180
3181         ENTRY;
3182
3183         rc = ptlrpcd_addref();
3184         if (rc)
3185                 RETURN(rc);
3186
3187         rc = client_obd_setup(obd, lcfg);
3188         if (rc)
3189                 GOTO(out_ptlrpcd, rc);
3190
3191
3192         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3193         if (IS_ERR(handler))
3194                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3195         cli->cl_writeback_work = handler;
3196
3197         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3198         if (IS_ERR(handler))
3199                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3200         cli->cl_lru_work = handler;
3201
3202         rc = osc_quota_setup(obd);
3203         if (rc)
3204                 GOTO(out_ptlrpcd_work, rc);
3205
3206         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3207         osc_update_next_shrink(cli);
3208
3209         RETURN(rc);
3210
3211 out_ptlrpcd_work:
3212         if (cli->cl_writeback_work != NULL) {
3213                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3214                 cli->cl_writeback_work = NULL;
3215         }
3216         if (cli->cl_lru_work != NULL) {
3217                 ptlrpcd_destroy_work(cli->cl_lru_work);
3218                 cli->cl_lru_work = NULL;
3219         }
3220         client_obd_cleanup(obd);
3221 out_ptlrpcd:
3222         ptlrpcd_decref();
3223         RETURN(rc);
3224 }
3225 EXPORT_SYMBOL(osc_setup_common);
3226
3227 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3228 {
3229         struct client_obd *cli = &obd->u.cli;
3230         int                adding;
3231         int                added;
3232         int                req_count;
3233         int                rc;
3234
3235         ENTRY;
3236
3237         rc = osc_setup_common(obd, lcfg);
3238         if (rc < 0)
3239                 RETURN(rc);
3240
3241         rc = osc_tunables_init(obd);
3242         if (rc)
3243                 RETURN(rc);
3244
3245         /*
3246          * We try to control the total number of requests with a upper limit
3247          * osc_reqpool_maxreqcount. There might be some race which will cause
3248          * over-limit allocation, but it is fine.
3249          */
3250         req_count = atomic_read(&osc_pool_req_count);
3251         if (req_count < osc_reqpool_maxreqcount) {
3252                 adding = cli->cl_max_rpcs_in_flight + 2;
3253                 if (req_count + adding > osc_reqpool_maxreqcount)
3254                         adding = osc_reqpool_maxreqcount - req_count;
3255
3256                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3257                 atomic_add(added, &osc_pool_req_count);
3258         }
3259
3260         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3261
3262         spin_lock(&osc_shrink_lock);
3263         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3264         spin_unlock(&osc_shrink_lock);
3265         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3266         cli->cl_import->imp_idle_debug = D_HA;
3267
3268         RETURN(0);
3269 }
3270
3271 int osc_precleanup_common(struct obd_device *obd)
3272 {
3273         struct client_obd *cli = &obd->u.cli;
3274         ENTRY;
3275
3276         /* LU-464
3277          * for echo client, export may be on zombie list, wait for
3278          * zombie thread to cull it, because cli.cl_import will be
3279          * cleared in client_disconnect_export():
3280          *   class_export_destroy() -> obd_cleanup() ->
3281          *   echo_device_free() -> echo_client_cleanup() ->
3282          *   obd_disconnect() -> osc_disconnect() ->
3283          *   client_disconnect_export()
3284          */
3285         obd_zombie_barrier();
3286         if (cli->cl_writeback_work) {
3287                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3288                 cli->cl_writeback_work = NULL;
3289         }
3290
3291         if (cli->cl_lru_work) {
3292                 ptlrpcd_destroy_work(cli->cl_lru_work);
3293                 cli->cl_lru_work = NULL;
3294         }
3295
3296         obd_cleanup_client_import(obd);
3297         RETURN(0);
3298 }
3299 EXPORT_SYMBOL(osc_precleanup_common);
3300
3301 static int osc_precleanup(struct obd_device *obd)
3302 {
3303         ENTRY;
3304
3305         osc_precleanup_common(obd);
3306
3307         ptlrpc_lprocfs_unregister_obd(obd);
3308         RETURN(0);
3309 }
3310
3311 int osc_cleanup_common(struct obd_device *obd)
3312 {
3313         struct client_obd *cli = &obd->u.cli;
3314         int rc;
3315
3316         ENTRY;
3317
3318         spin_lock(&osc_shrink_lock);
3319         list_del(&cli->cl_shrink_list);
3320         spin_unlock(&osc_shrink_lock);
3321
3322         /* lru cleanup */
3323         if (cli->cl_cache != NULL) {
3324                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3325                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3326                 list_del_init(&cli->cl_lru_osc);
3327                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3328                 cli->cl_lru_left = NULL;
3329                 cl_cache_decref(cli->cl_cache);
3330                 cli->cl_cache = NULL;
3331         }
3332
3333         /* free memory of osc quota cache */
3334         osc_quota_cleanup(obd);
3335
3336         rc = client_obd_cleanup(obd);
3337
3338         ptlrpcd_decref();
3339         RETURN(rc);
3340 }
3341 EXPORT_SYMBOL(osc_cleanup_common);
3342
3343 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3344 {
3345         ssize_t count  = class_modify_config(lcfg, PARAM_OSC,
3346                                              &obd->obd_kset.kobj);
3347         return count > 0 ? 0 : count;
3348 }
3349
3350 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3351 {
3352         return osc_process_config_base(obd, buf);
3353 }
3354
3355 static struct obd_ops osc_obd_ops = {
3356         .o_owner                = THIS_MODULE,
3357         .o_setup                = osc_setup,
3358         .o_precleanup           = osc_precleanup,
3359         .o_cleanup              = osc_cleanup_common,
3360         .o_add_conn             = client_import_add_conn,
3361         .o_del_conn             = client_import_del_conn,
3362         .o_connect              = client_connect_import,
3363         .o_reconnect            = osc_reconnect,
3364         .o_disconnect           = osc_disconnect,
3365         .o_statfs               = osc_statfs,
3366         .o_statfs_async         = osc_statfs_async,
3367         .o_create               = osc_create,
3368         .o_destroy              = osc_destroy,
3369         .o_getattr              = osc_getattr,
3370         .o_setattr              = osc_setattr,
3371         .o_iocontrol            = osc_iocontrol,
3372         .o_set_info_async       = osc_set_info_async,
3373         .o_import_event         = osc_import_event,
3374         .o_process_config       = osc_process_config,
3375         .o_quotactl             = osc_quotactl,
3376 };
3377
3378 static struct shrinker *osc_cache_shrinker;
3379 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3380 DEFINE_SPINLOCK(osc_shrink_lock);
3381
3382 #ifndef HAVE_SHRINKER_COUNT
3383 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3384 {
3385         struct shrink_control scv = {
3386                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3387                 .gfp_mask   = shrink_param(sc, gfp_mask)
3388         };
3389 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3390         struct shrinker *shrinker = NULL;
3391 #endif
3392
3393         (void)osc_cache_shrink_scan(shrinker, &scv);
3394
3395         return osc_cache_shrink_count(shrinker, &scv);
3396 }
3397 #endif
3398
3399 static int __init osc_init(void)
3400 {
3401         bool enable_proc = true;
3402         struct obd_type *type;
3403         unsigned int reqpool_size;
3404         unsigned int reqsize;
3405         int rc;
3406         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3407                          osc_cache_shrink_count, osc_cache_shrink_scan);
3408         ENTRY;
3409
3410         /* print an address of _any_ initialized kernel symbol from this
3411          * module, to allow debugging with gdb that doesn't support data
3412          * symbols from modules.*/
3413         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3414
3415         rc = lu_kmem_init(osc_caches);
3416         if (rc)
3417                 RETURN(rc);
3418
3419         type = class_search_type(LUSTRE_OSP_NAME);
3420         if (type != NULL && type->typ_procsym != NULL)
3421                 enable_proc = false;
3422
3423         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3424                                  LUSTRE_OSC_NAME, &osc_device_type);
3425         if (rc)
3426                 GOTO(out_kmem, rc);
3427
3428         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3429
3430         /* This is obviously too much memory, only prevent overflow here */
3431         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3432                 GOTO(out_type, rc = -EINVAL);
3433
3434         reqpool_size = osc_reqpool_mem_max << 20;
3435
3436         reqsize = 1;
3437         while (reqsize < OST_IO_MAXREQSIZE)
3438                 reqsize = reqsize << 1;
3439
3440         /*
3441          * We don't enlarge the request count in OSC pool according to
3442          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3443          * tried after normal allocation failed. So a small OSC pool won't
3444          * cause much performance degression in most of cases.
3445          */
3446         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3447
3448         atomic_set(&osc_pool_req_count, 0);
3449         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3450                                           ptlrpc_add_rqs_to_pool);
3451
3452         if (osc_rq_pool == NULL)
3453                 GOTO(out_type, rc = -ENOMEM);
3454
3455         rc = osc_start_grant_work();
3456         if (rc != 0)
3457                 GOTO(out_req_pool, rc);
3458
3459         RETURN(rc);
3460
3461 out_req_pool:
3462         ptlrpc_free_rq_pool(osc_rq_pool);
3463 out_type:
3464         class_unregister_type(LUSTRE_OSC_NAME);
3465 out_kmem:
3466         lu_kmem_fini(osc_caches);
3467
3468         RETURN(rc);
3469 }
3470
3471 static void __exit osc_exit(void)
3472 {
3473         osc_stop_grant_work();
3474         remove_shrinker(osc_cache_shrinker);
3475         class_unregister_type(LUSTRE_OSC_NAME);
3476         lu_kmem_fini(osc_caches);
3477         ptlrpc_free_rq_pool(osc_rq_pool);
3478 }
3479
3480 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3481 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3482 MODULE_VERSION(LUSTRE_VERSION_STRING);
3483 MODULE_LICENSE("GPL");
3484
3485 module_init(osc_init);
3486 module_exit(osc_exit);