Whamcloud - gitweb
LU-12759 osc: don't re-enable grant shrink on reconnect
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct list_head       cancels = LIST_HEAD_INIT(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
649                      cli->cl_dirty_max_pages)) {
650                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
651                        cli->cl_dirty_pages, cli->cl_dirty_transit,
652                        cli->cl_dirty_max_pages);
653                 oa->o_undirty = 0;
654         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
655                             atomic_long_read(&obd_dirty_transit_pages) >
656                             (long)(obd_max_dirty_pages + 1))) {
657                 /* The atomic_read() allowing the atomic_inc() are
658                  * not covered by a lock thus they may safely race and trip
659                  * this CERROR() unless we add in a small fudge factor (+1). */
660                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
661                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
662                        atomic_long_read(&obd_dirty_transit_pages),
663                        obd_max_dirty_pages);
664                 oa->o_undirty = 0;
665         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
666                             0x7fffffff)) {
667                 CERROR("dirty %lu - dirty_max %lu too big???\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else {
671                 unsigned long nrpages;
672                 unsigned long undirty;
673
674                 nrpages = cli->cl_max_pages_per_rpc;
675                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
676                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
677                 undirty = nrpages << PAGE_SHIFT;
678                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
679                                  GRANT_PARAM)) {
680                         int nrextents;
681
682                         /* take extent tax into account when asking for more
683                          * grant space */
684                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
685                                      cli->cl_max_extent_pages;
686                         undirty += nrextents * cli->cl_grant_extent_tax;
687                 }
688                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
689                  * to add extent tax, etc.
690                  */
691                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
692                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
693         }
694         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
695         oa->o_dropped = cli->cl_lost_grant;
696         cli->cl_lost_grant = 0;
697         spin_unlock(&cli->cl_loi_list_lock);
698         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
699                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
700 }
701
702 void osc_update_next_shrink(struct client_obd *cli)
703 {
704         cli->cl_next_shrink_grant = ktime_get_seconds() +
705                                     cli->cl_grant_shrink_interval;
706
707         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
708                cli->cl_next_shrink_grant);
709 }
710
711 static void __osc_update_grant(struct client_obd *cli, u64 grant)
712 {
713         spin_lock(&cli->cl_loi_list_lock);
714         cli->cl_avail_grant += grant;
715         spin_unlock(&cli->cl_loi_list_lock);
716 }
717
718 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
719 {
720         if (body->oa.o_valid & OBD_MD_FLGRANT) {
721                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
722                 __osc_update_grant(cli, body->oa.o_grant);
723         }
724 }
725
726 /**
727  * grant thread data for shrinking space.
728  */
729 struct grant_thread_data {
730         struct list_head        gtd_clients;
731         struct mutex            gtd_mutex;
732         unsigned long           gtd_stopped:1;
733 };
734 static struct grant_thread_data client_gtd;
735
736 static int osc_shrink_grant_interpret(const struct lu_env *env,
737                                       struct ptlrpc_request *req,
738                                       void *args, int rc)
739 {
740         struct osc_grant_args *aa = args;
741         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
742         struct ost_body *body;
743
744         if (rc != 0) {
745                 __osc_update_grant(cli, aa->aa_oa->o_grant);
746                 GOTO(out, rc);
747         }
748
749         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750         LASSERT(body);
751         osc_update_grant(cli, body);
752 out:
753         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
754         aa->aa_oa = NULL;
755
756         return rc;
757 }
758
759 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
760 {
761         spin_lock(&cli->cl_loi_list_lock);
762         oa->o_grant = cli->cl_avail_grant / 4;
763         cli->cl_avail_grant -= oa->o_grant;
764         spin_unlock(&cli->cl_loi_list_lock);
765         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
766                 oa->o_valid |= OBD_MD_FLFLAGS;
767                 oa->o_flags = 0;
768         }
769         oa->o_flags |= OBD_FL_SHRINK_GRANT;
770         osc_update_next_shrink(cli);
771 }
772
773 /* Shrink the current grant, either from some large amount to enough for a
774  * full set of in-flight RPCs, or if we have already shrunk to that limit
775  * then to enough for a single RPC.  This avoids keeping more grant than
776  * needed, and avoids shrinking the grant piecemeal. */
777 static int osc_shrink_grant(struct client_obd *cli)
778 {
779         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
780                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
781
782         spin_lock(&cli->cl_loi_list_lock);
783         if (cli->cl_avail_grant <= target_bytes)
784                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
785         spin_unlock(&cli->cl_loi_list_lock);
786
787         return osc_shrink_grant_to_target(cli, target_bytes);
788 }
789
790 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
791 {
792         int                     rc = 0;
793         struct ost_body        *body;
794         ENTRY;
795
796         spin_lock(&cli->cl_loi_list_lock);
797         /* Don't shrink if we are already above or below the desired limit
798          * We don't want to shrink below a single RPC, as that will negatively
799          * impact block allocation and long-term performance. */
800         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
801                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
802
803         if (target_bytes >= cli->cl_avail_grant) {
804                 spin_unlock(&cli->cl_loi_list_lock);
805                 RETURN(0);
806         }
807         spin_unlock(&cli->cl_loi_list_lock);
808
809         OBD_ALLOC_PTR(body);
810         if (!body)
811                 RETURN(-ENOMEM);
812
813         osc_announce_cached(cli, &body->oa, 0);
814
815         spin_lock(&cli->cl_loi_list_lock);
816         if (target_bytes >= cli->cl_avail_grant) {
817                 /* available grant has changed since target calculation */
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 GOTO(out_free, rc = 0);
820         }
821         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
822         cli->cl_avail_grant = target_bytes;
823         spin_unlock(&cli->cl_loi_list_lock);
824         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
825                 body->oa.o_valid |= OBD_MD_FLFLAGS;
826                 body->oa.o_flags = 0;
827         }
828         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
829         osc_update_next_shrink(cli);
830
831         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
832                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
833                                 sizeof(*body), body, NULL);
834         if (rc != 0)
835                 __osc_update_grant(cli, body->oa.o_grant);
836 out_free:
837         OBD_FREE_PTR(body);
838         RETURN(rc);
839 }
840
841 static int osc_should_shrink_grant(struct client_obd *client)
842 {
843         time64_t next_shrink = client->cl_next_shrink_grant;
844
845         if (client->cl_import == NULL)
846                 return 0;
847
848         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
849             client->cl_import->imp_grant_shrink_disabled)
850                 return 0;
851
852         if (ktime_get_seconds() >= next_shrink - 5) {
853                 /* Get the current RPC size directly, instead of going via:
854                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
855                  * Keep comment here so that it can be found by searching. */
856                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
859                     client->cl_avail_grant > brw_size)
860                         return 1;
861                 else
862                         osc_update_next_shrink(client);
863         }
864         return 0;
865 }
866
867 #define GRANT_SHRINK_RPC_BATCH  100
868
869 static struct delayed_work work;
870
871 static void osc_grant_work_handler(struct work_struct *data)
872 {
873         struct client_obd *cli;
874         int rpc_sent;
875         bool init_next_shrink = true;
876         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
877
878         rpc_sent = 0;
879         mutex_lock(&client_gtd.gtd_mutex);
880         list_for_each_entry(cli, &client_gtd.gtd_clients,
881                             cl_grant_chain) {
882                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
883                     osc_should_shrink_grant(cli)) {
884                         osc_shrink_grant(cli);
885                         rpc_sent++;
886                 }
887
888                 if (!init_next_shrink) {
889                         if (cli->cl_next_shrink_grant < next_shrink &&
890                             cli->cl_next_shrink_grant > ktime_get_seconds())
891                                 next_shrink = cli->cl_next_shrink_grant;
892                 } else {
893                         init_next_shrink = false;
894                         next_shrink = cli->cl_next_shrink_grant;
895                 }
896         }
897         mutex_unlock(&client_gtd.gtd_mutex);
898
899         if (client_gtd.gtd_stopped == 1)
900                 return;
901
902         if (next_shrink > ktime_get_seconds()) {
903                 time64_t delay = next_shrink - ktime_get_seconds();
904
905                 schedule_delayed_work(&work, cfs_time_seconds(delay));
906         } else {
907                 schedule_work(&work.work);
908         }
909 }
910
911 void osc_schedule_grant_work(void)
912 {
913         cancel_delayed_work_sync(&work);
914         schedule_work(&work.work);
915 }
916
917 /**
918  * Start grant thread for returing grant to server for idle clients.
919  */
920 static int osc_start_grant_work(void)
921 {
922         client_gtd.gtd_stopped = 0;
923         mutex_init(&client_gtd.gtd_mutex);
924         INIT_LIST_HEAD(&client_gtd.gtd_clients);
925
926         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
927         schedule_work(&work.work);
928
929         return 0;
930 }
931
932 static void osc_stop_grant_work(void)
933 {
934         client_gtd.gtd_stopped = 1;
935         cancel_delayed_work_sync(&work);
936 }
937
938 static void osc_add_grant_list(struct client_obd *client)
939 {
940         mutex_lock(&client_gtd.gtd_mutex);
941         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
942         mutex_unlock(&client_gtd.gtd_mutex);
943 }
944
945 static void osc_del_grant_list(struct client_obd *client)
946 {
947         if (list_empty(&client->cl_grant_chain))
948                 return;
949
950         mutex_lock(&client_gtd.gtd_mutex);
951         list_del_init(&client->cl_grant_chain);
952         mutex_unlock(&client_gtd.gtd_mutex);
953 }
954
955 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
956 {
957         /*
958          * ocd_grant is the total grant amount we're expect to hold: if we've
959          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
960          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
961          * dirty.
962          *
963          * race is tolerable here: if we're evicted, but imp_state already
964          * left EVICTED state, then cl_dirty_pages must be 0 already.
965          */
966         spin_lock(&cli->cl_loi_list_lock);
967         cli->cl_avail_grant = ocd->ocd_grant;
968         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
969                 cli->cl_avail_grant -= cli->cl_reserved_grant;
970                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
971                         cli->cl_avail_grant -= cli->cl_dirty_grant;
972                 else
973                         cli->cl_avail_grant -=
974                                         cli->cl_dirty_pages << PAGE_SHIFT;
975         }
976
977         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
978                 u64 size;
979                 int chunk_mask;
980
981                 /* overhead for each extent insertion */
982                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
983                 /* determine the appropriate chunk size used by osc_extent. */
984                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
985                                           ocd->ocd_grant_blkbits);
986                 /* max_pages_per_rpc must be chunk aligned */
987                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
988                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
989                                              ~chunk_mask) & chunk_mask;
990                 /* determine maximum extent size, in #pages */
991                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
992                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
993                 if (cli->cl_max_extent_pages == 0)
994                         cli->cl_max_extent_pages = 1;
995         } else {
996                 cli->cl_grant_extent_tax = 0;
997                 cli->cl_chunkbits = PAGE_SHIFT;
998                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
999         }
1000         spin_unlock(&cli->cl_loi_list_lock);
1001
1002         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1003                 "chunk bits: %d cl_max_extent_pages: %d\n",
1004                 cli_name(cli),
1005                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1006                 cli->cl_max_extent_pages);
1007
1008         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1009                 osc_add_grant_list(cli);
1010 }
1011 EXPORT_SYMBOL(osc_init_grant);
1012
1013 /* We assume that the reason this OSC got a short read is because it read
1014  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1015  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1016  * this stripe never got written at or beyond this stripe offset yet. */
1017 static void handle_short_read(int nob_read, size_t page_count,
1018                               struct brw_page **pga)
1019 {
1020         char *ptr;
1021         int i = 0;
1022
1023         /* skip bytes read OK */
1024         while (nob_read > 0) {
1025                 LASSERT (page_count > 0);
1026
1027                 if (pga[i]->count > nob_read) {
1028                         /* EOF inside this page */
1029                         ptr = kmap(pga[i]->pg) +
1030                                 (pga[i]->off & ~PAGE_MASK);
1031                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1032                         kunmap(pga[i]->pg);
1033                         page_count--;
1034                         i++;
1035                         break;
1036                 }
1037
1038                 nob_read -= pga[i]->count;
1039                 page_count--;
1040                 i++;
1041         }
1042
1043         /* zero remaining pages */
1044         while (page_count-- > 0) {
1045                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1046                 memset(ptr, 0, pga[i]->count);
1047                 kunmap(pga[i]->pg);
1048                 i++;
1049         }
1050 }
1051
1052 static int check_write_rcs(struct ptlrpc_request *req,
1053                            int requested_nob, int niocount,
1054                            size_t page_count, struct brw_page **pga)
1055 {
1056         int     i;
1057         __u32   *remote_rcs;
1058
1059         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1060                                                   sizeof(*remote_rcs) *
1061                                                   niocount);
1062         if (remote_rcs == NULL) {
1063                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1064                 return(-EPROTO);
1065         }
1066
1067         /* return error if any niobuf was in error */
1068         for (i = 0; i < niocount; i++) {
1069                 if ((int)remote_rcs[i] < 0) {
1070                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1071                                i, remote_rcs[i], req);
1072                         return remote_rcs[i];
1073                 }
1074
1075                 if (remote_rcs[i] != 0) {
1076                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1077                                 i, remote_rcs[i], req);
1078                         return(-EPROTO);
1079                 }
1080         }
1081         if (req->rq_bulk != NULL &&
1082             req->rq_bulk->bd_nob_transferred != requested_nob) {
1083                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1084                        req->rq_bulk->bd_nob_transferred, requested_nob);
1085                 return(-EPROTO);
1086         }
1087
1088         return (0);
1089 }
1090
1091 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1092 {
1093         if (p1->flag != p2->flag) {
1094                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1095                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1096                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1097
1098                 /* warn if we try to combine flags that we don't know to be
1099                  * safe to combine */
1100                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1101                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1102                               "report this at https://jira.whamcloud.com/\n",
1103                               p1->flag, p2->flag);
1104                 }
1105                 return 0;
1106         }
1107
1108         return (p1->off + p1->count == p2->off);
1109 }
1110
1111 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1112 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1113                                    size_t pg_count, struct brw_page **pga,
1114                                    int opc, obd_dif_csum_fn *fn,
1115                                    int sector_size,
1116                                    u32 *check_sum)
1117 {
1118         struct ahash_request *req;
1119         /* Used Adler as the default checksum type on top of DIF tags */
1120         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1121         struct page *__page;
1122         unsigned char *buffer;
1123         __u16 *guard_start;
1124         unsigned int bufsize;
1125         int guard_number;
1126         int used_number = 0;
1127         int used;
1128         u32 cksum;
1129         int rc = 0;
1130         int i = 0;
1131
1132         LASSERT(pg_count > 0);
1133
1134         __page = alloc_page(GFP_KERNEL);
1135         if (__page == NULL)
1136                 return -ENOMEM;
1137
1138         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1139         if (IS_ERR(req)) {
1140                 rc = PTR_ERR(req);
1141                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1142                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1143                 GOTO(out, rc);
1144         }
1145
1146         buffer = kmap(__page);
1147         guard_start = (__u16 *)buffer;
1148         guard_number = PAGE_SIZE / sizeof(*guard_start);
1149         while (nob > 0 && pg_count > 0) {
1150                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1151
1152                 /* corrupt the data before we compute the checksum, to
1153                  * simulate an OST->client data error */
1154                 if (unlikely(i == 0 && opc == OST_READ &&
1155                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1156                         unsigned char *ptr = kmap(pga[i]->pg);
1157                         int off = pga[i]->off & ~PAGE_MASK;
1158
1159                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1160                         kunmap(pga[i]->pg);
1161                 }
1162
1163                 /*
1164                  * The left guard number should be able to hold checksums of a
1165                  * whole page
1166                  */
1167                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1168                                                   pga[i]->off & ~PAGE_MASK,
1169                                                   count,
1170                                                   guard_start + used_number,
1171                                                   guard_number - used_number,
1172                                                   &used, sector_size,
1173                                                   fn);
1174                 if (rc)
1175                         break;
1176
1177                 used_number += used;
1178                 if (used_number == guard_number) {
1179                         cfs_crypto_hash_update_page(req, __page, 0,
1180                                 used_number * sizeof(*guard_start));
1181                         used_number = 0;
1182                 }
1183
1184                 nob -= pga[i]->count;
1185                 pg_count--;
1186                 i++;
1187         }
1188         kunmap(__page);
1189         if (rc)
1190                 GOTO(out, rc);
1191
1192         if (used_number != 0)
1193                 cfs_crypto_hash_update_page(req, __page, 0,
1194                         used_number * sizeof(*guard_start));
1195
1196         bufsize = sizeof(cksum);
1197         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1198
1199         /* For sending we only compute the wrong checksum instead
1200          * of corrupting the data so it is still correct on a redo */
1201         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1202                 cksum++;
1203
1204         *check_sum = cksum;
1205 out:
1206         __free_page(__page);
1207         return rc;
1208 }
1209 #else /* !CONFIG_CRC_T10DIF */
1210 #define obd_dif_ip_fn NULL
1211 #define obd_dif_crc_fn NULL
1212 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1213         -EOPNOTSUPP
1214 #endif /* CONFIG_CRC_T10DIF */
1215
1216 static int osc_checksum_bulk(int nob, size_t pg_count,
1217                              struct brw_page **pga, int opc,
1218                              enum cksum_types cksum_type,
1219                              u32 *cksum)
1220 {
1221         int                             i = 0;
1222         struct ahash_request           *req;
1223         unsigned int                    bufsize;
1224         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1225
1226         LASSERT(pg_count > 0);
1227
1228         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1229         if (IS_ERR(req)) {
1230                 CERROR("Unable to initialize checksum hash %s\n",
1231                        cfs_crypto_hash_name(cfs_alg));
1232                 return PTR_ERR(req);
1233         }
1234
1235         while (nob > 0 && pg_count > 0) {
1236                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1237
1238                 /* corrupt the data before we compute the checksum, to
1239                  * simulate an OST->client data error */
1240                 if (i == 0 && opc == OST_READ &&
1241                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1242                         unsigned char *ptr = kmap(pga[i]->pg);
1243                         int off = pga[i]->off & ~PAGE_MASK;
1244
1245                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1246                         kunmap(pga[i]->pg);
1247                 }
1248                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1249                                             pga[i]->off & ~PAGE_MASK,
1250                                             count);
1251                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1252                                (int)(pga[i]->off & ~PAGE_MASK));
1253
1254                 nob -= pga[i]->count;
1255                 pg_count--;
1256                 i++;
1257         }
1258
1259         bufsize = sizeof(*cksum);
1260         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1261
1262         /* For sending we only compute the wrong checksum instead
1263          * of corrupting the data so it is still correct on a redo */
1264         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1265                 (*cksum)++;
1266
1267         return 0;
1268 }
1269
1270 static int osc_checksum_bulk_rw(const char *obd_name,
1271                                 enum cksum_types cksum_type,
1272                                 int nob, size_t pg_count,
1273                                 struct brw_page **pga, int opc,
1274                                 u32 *check_sum)
1275 {
1276         obd_dif_csum_fn *fn = NULL;
1277         int sector_size = 0;
1278         int rc;
1279
1280         ENTRY;
1281         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1282
1283         if (fn)
1284                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1285                                              opc, fn, sector_size, check_sum);
1286         else
1287                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1288                                        check_sum);
1289
1290         RETURN(rc);
1291 }
1292
1293 static int
1294 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1295                      u32 page_count, struct brw_page **pga,
1296                      struct ptlrpc_request **reqp, int resend)
1297 {
1298         struct ptlrpc_request   *req;
1299         struct ptlrpc_bulk_desc *desc;
1300         struct ost_body         *body;
1301         struct obd_ioobj        *ioobj;
1302         struct niobuf_remote    *niobuf;
1303         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1304         struct osc_brw_async_args *aa;
1305         struct req_capsule      *pill;
1306         struct brw_page *pg_prev;
1307         void *short_io_buf;
1308         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1309
1310         ENTRY;
1311         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1312                 RETURN(-ENOMEM); /* Recoverable */
1313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1314                 RETURN(-EINVAL); /* Fatal */
1315
1316         if ((cmd & OBD_BRW_WRITE) != 0) {
1317                 opc = OST_WRITE;
1318                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1319                                                 osc_rq_pool,
1320                                                 &RQF_OST_BRW_WRITE);
1321         } else {
1322                 opc = OST_READ;
1323                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1324         }
1325         if (req == NULL)
1326                 RETURN(-ENOMEM);
1327
1328         for (niocount = i = 1; i < page_count; i++) {
1329                 if (!can_merge_pages(pga[i - 1], pga[i]))
1330                         niocount++;
1331         }
1332
1333         pill = &req->rq_pill;
1334         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1335                              sizeof(*ioobj));
1336         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1337                              niocount * sizeof(*niobuf));
1338
1339         for (i = 0; i < page_count; i++)
1340                 short_io_size += pga[i]->count;
1341
1342         /* Check if read/write is small enough to be a short io. */
1343         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1344             !imp_connect_shortio(cli->cl_import))
1345                 short_io_size = 0;
1346
1347         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1348                              opc == OST_READ ? 0 : short_io_size);
1349         if (opc == OST_READ)
1350                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1351                                      short_io_size);
1352
1353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1354         if (rc) {
1355                 ptlrpc_request_free(req);
1356                 RETURN(rc);
1357         }
1358         osc_set_io_portal(req);
1359
1360         ptlrpc_at_set_req_timeout(req);
1361         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1362          * retry logic */
1363         req->rq_no_retry_einprogress = 1;
1364
1365         if (short_io_size != 0) {
1366                 desc = NULL;
1367                 short_io_buf = NULL;
1368                 goto no_bulk;
1369         }
1370
1371         desc = ptlrpc_prep_bulk_imp(req, page_count,
1372                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1373                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1374                         PTLRPC_BULK_PUT_SINK) |
1375                         PTLRPC_BULK_BUF_KIOV,
1376                 OST_BULK_PORTAL,
1377                 &ptlrpc_bulk_kiov_pin_ops);
1378
1379         if (desc == NULL)
1380                 GOTO(out, rc = -ENOMEM);
1381         /* NB request now owns desc and will free it when it gets freed */
1382 no_bulk:
1383         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1384         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1385         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1386         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1387
1388         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1389
1390         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1391          * and from_kgid(), because they are asynchronous. Fortunately, variable
1392          * oa contains valid o_uid and o_gid in these two operations.
1393          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1394          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1395          * other process logic */
1396         body->oa.o_uid = oa->o_uid;
1397         body->oa.o_gid = oa->o_gid;
1398
1399         obdo_to_ioobj(oa, ioobj);
1400         ioobj->ioo_bufcnt = niocount;
1401         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1402          * that might be send for this request.  The actual number is decided
1403          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1404          * "max - 1" for old client compatibility sending "0", and also so the
1405          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1406         if (desc != NULL)
1407                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1408         else /* short io */
1409                 ioobj_max_brw_set(ioobj, 0);
1410
1411         if (short_io_size != 0) {
1412                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1413                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1414                         body->oa.o_flags = 0;
1415                 }
1416                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1417                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1418                        short_io_size);
1419                 if (opc == OST_WRITE) {
1420                         short_io_buf = req_capsule_client_get(pill,
1421                                                               &RMF_SHORT_IO);
1422                         LASSERT(short_io_buf != NULL);
1423                 }
1424         }
1425
1426         LASSERT(page_count > 0);
1427         pg_prev = pga[0];
1428         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1429                 struct brw_page *pg = pga[i];
1430                 int poff = pg->off & ~PAGE_MASK;
1431
1432                 LASSERT(pg->count > 0);
1433                 /* make sure there is no gap in the middle of page array */
1434                 LASSERTF(page_count == 1 ||
1435                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1436                           ergo(i > 0 && i < page_count - 1,
1437                                poff == 0 && pg->count == PAGE_SIZE)   &&
1438                           ergo(i == page_count - 1, poff == 0)),
1439                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1440                          i, page_count, pg, pg->off, pg->count);
1441                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1442                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1443                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1444                          i, page_count,
1445                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1446                          pg_prev->pg, page_private(pg_prev->pg),
1447                          pg_prev->pg->index, pg_prev->off);
1448                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1449                         (pg->flag & OBD_BRW_SRVLOCK));
1450                 if (short_io_size != 0 && opc == OST_WRITE) {
1451                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1452
1453                         LASSERT(short_io_size >= requested_nob + pg->count);
1454                         memcpy(short_io_buf + requested_nob,
1455                                ptr + poff,
1456                                pg->count);
1457                         ll_kunmap_atomic(ptr, KM_USER0);
1458                 } else if (short_io_size == 0) {
1459                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1460                                                          pg->count);
1461                 }
1462                 requested_nob += pg->count;
1463
1464                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1465                         niobuf--;
1466                         niobuf->rnb_len += pg->count;
1467                 } else {
1468                         niobuf->rnb_offset = pg->off;
1469                         niobuf->rnb_len    = pg->count;
1470                         niobuf->rnb_flags  = pg->flag;
1471                 }
1472                 pg_prev = pg;
1473         }
1474
1475         LASSERTF((void *)(niobuf - niocount) ==
1476                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1477                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1478                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1479
1480         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1481         if (resend) {
1482                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1483                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1484                         body->oa.o_flags = 0;
1485                 }
1486                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1487         }
1488
1489         if (osc_should_shrink_grant(cli))
1490                 osc_shrink_grant_local(cli, &body->oa);
1491
1492         /* size[REQ_REC_OFF] still sizeof (*body) */
1493         if (opc == OST_WRITE) {
1494                 if (cli->cl_checksum &&
1495                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1496                         /* store cl_cksum_type in a local variable since
1497                          * it can be changed via lprocfs */
1498                         enum cksum_types cksum_type = cli->cl_cksum_type;
1499
1500                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1501                                 body->oa.o_flags = 0;
1502
1503                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1504                                                                 cksum_type);
1505                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1506
1507                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1508                                                   requested_nob, page_count,
1509                                                   pga, OST_WRITE,
1510                                                   &body->oa.o_cksum);
1511                         if (rc < 0) {
1512                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1513                                        rc);
1514                                 GOTO(out, rc);
1515                         }
1516                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1517                                body->oa.o_cksum);
1518
1519                         /* save this in 'oa', too, for later checking */
1520                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1521                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1522                                                            cksum_type);
1523                 } else {
1524                         /* clear out the checksum flag, in case this is a
1525                          * resend but cl_checksum is no longer set. b=11238 */
1526                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1527                 }
1528                 oa->o_cksum = body->oa.o_cksum;
1529                 /* 1 RC per niobuf */
1530                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1531                                      sizeof(__u32) * niocount);
1532         } else {
1533                 if (cli->cl_checksum &&
1534                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1535                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1536                                 body->oa.o_flags = 0;
1537                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1538                                 cli->cl_cksum_type);
1539                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1540                 }
1541
1542                 /* Client cksum has been already copied to wire obdo in previous
1543                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1544                  * resent due to cksum error, this will allow Server to
1545                  * check+dump pages on its side */
1546         }
1547         ptlrpc_request_set_replen(req);
1548
1549         aa = ptlrpc_req_async_args(aa, req);
1550         aa->aa_oa = oa;
1551         aa->aa_requested_nob = requested_nob;
1552         aa->aa_nio_count = niocount;
1553         aa->aa_page_count = page_count;
1554         aa->aa_resends = 0;
1555         aa->aa_ppga = pga;
1556         aa->aa_cli = cli;
1557         INIT_LIST_HEAD(&aa->aa_oaps);
1558
1559         *reqp = req;
1560         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1561         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1562                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1563                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1564         RETURN(0);
1565
1566  out:
1567         ptlrpc_req_finished(req);
1568         RETURN(rc);
1569 }
1570
1571 char dbgcksum_file_name[PATH_MAX];
1572
1573 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1574                                 struct brw_page **pga, __u32 server_cksum,
1575                                 __u32 client_cksum)
1576 {
1577         struct file *filp;
1578         int rc, i;
1579         unsigned int len;
1580         char *buf;
1581
1582         /* will only keep dump of pages on first error for the same range in
1583          * file/fid, not during the resends/retries. */
1584         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1585                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1586                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1587                   libcfs_debug_file_path_arr :
1588                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1591                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1592                  pga[0]->off,
1593                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1594                  client_cksum, server_cksum);
1595         filp = filp_open(dbgcksum_file_name,
1596                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1597         if (IS_ERR(filp)) {
1598                 rc = PTR_ERR(filp);
1599                 if (rc == -EEXIST)
1600                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1601                                "checksum error: rc = %d\n", dbgcksum_file_name,
1602                                rc);
1603                 else
1604                         CERROR("%s: can't open to dump pages with checksum "
1605                                "error: rc = %d\n", dbgcksum_file_name, rc);
1606                 return;
1607         }
1608
1609         for (i = 0; i < page_count; i++) {
1610                 len = pga[i]->count;
1611                 buf = kmap(pga[i]->pg);
1612                 while (len != 0) {
1613                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1614                         if (rc < 0) {
1615                                 CERROR("%s: wanted to write %u but got %d "
1616                                        "error\n", dbgcksum_file_name, len, rc);
1617                                 break;
1618                         }
1619                         len -= rc;
1620                         buf += rc;
1621                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1622                                dbgcksum_file_name, rc);
1623                 }
1624                 kunmap(pga[i]->pg);
1625         }
1626
1627         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1628         if (rc)
1629                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1630         filp_close(filp, NULL);
1631         return;
1632 }
1633
1634 static int
1635 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1636                      __u32 client_cksum, __u32 server_cksum,
1637                      struct osc_brw_async_args *aa)
1638 {
1639         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1640         enum cksum_types cksum_type;
1641         obd_dif_csum_fn *fn = NULL;
1642         int sector_size = 0;
1643         __u32 new_cksum;
1644         char *msg;
1645         int rc;
1646
1647         if (server_cksum == client_cksum) {
1648                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1649                 return 0;
1650         }
1651
1652         if (aa->aa_cli->cl_checksum_dump)
1653                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1654                                     server_cksum, client_cksum);
1655
1656         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1657                                            oa->o_flags : 0);
1658
1659         switch (cksum_type) {
1660         case OBD_CKSUM_T10IP512:
1661                 fn = obd_dif_ip_fn;
1662                 sector_size = 512;
1663                 break;
1664         case OBD_CKSUM_T10IP4K:
1665                 fn = obd_dif_ip_fn;
1666                 sector_size = 4096;
1667                 break;
1668         case OBD_CKSUM_T10CRC512:
1669                 fn = obd_dif_crc_fn;
1670                 sector_size = 512;
1671                 break;
1672         case OBD_CKSUM_T10CRC4K:
1673                 fn = obd_dif_crc_fn;
1674                 sector_size = 4096;
1675                 break;
1676         default:
1677                 break;
1678         }
1679
1680         if (fn)
1681                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1682                                              aa->aa_page_count, aa->aa_ppga,
1683                                              OST_WRITE, fn, sector_size,
1684                                              &new_cksum);
1685         else
1686                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1687                                        aa->aa_ppga, OST_WRITE, cksum_type,
1688                                        &new_cksum);
1689
1690         if (rc < 0)
1691                 msg = "failed to calculate the client write checksum";
1692         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1693                 msg = "the server did not use the checksum type specified in "
1694                       "the original request - likely a protocol problem";
1695         else if (new_cksum == server_cksum)
1696                 msg = "changed on the client after we checksummed it - "
1697                       "likely false positive due to mmap IO (bug 11742)";
1698         else if (new_cksum == client_cksum)
1699                 msg = "changed in transit before arrival at OST";
1700         else
1701                 msg = "changed in transit AND doesn't match the original - "
1702                       "likely false positive due to mmap IO (bug 11742)";
1703
1704         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1705                            DFID " object "DOSTID" extent [%llu-%llu], original "
1706                            "client csum %x (type %x), server csum %x (type %x),"
1707                            " client csum now %x\n",
1708                            obd_name, msg, libcfs_nid2str(peer->nid),
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1711                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1712                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1713                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1714                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1715                            client_cksum,
1716                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1717                            server_cksum, cksum_type, new_cksum);
1718         return 1;
1719 }
1720
1721 /* Note rc enters this function as number of bytes transferred */
1722 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1723 {
1724         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1725         struct client_obd *cli = aa->aa_cli;
1726         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1727         const struct lnet_process_id *peer =
1728                 &req->rq_import->imp_connection->c_peer;
1729         struct ost_body *body;
1730         u32 client_cksum = 0;
1731
1732         ENTRY;
1733
1734         if (rc < 0 && rc != -EDQUOT) {
1735                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1736                 RETURN(rc);
1737         }
1738
1739         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1740         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1741         if (body == NULL) {
1742                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1743                 RETURN(-EPROTO);
1744         }
1745
1746         /* set/clear over quota flag for a uid/gid/projid */
1747         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1748             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1749                 unsigned qid[LL_MAXQUOTAS] = {
1750                                          body->oa.o_uid, body->oa.o_gid,
1751                                          body->oa.o_projid };
1752                 CDEBUG(D_QUOTA,
1753                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1754                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1755                        body->oa.o_valid, body->oa.o_flags);
1756                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1757                                        body->oa.o_flags);
1758         }
1759
1760         osc_update_grant(cli, body);
1761
1762         if (rc < 0)
1763                 RETURN(rc);
1764
1765         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1766                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1767
1768         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1769                 if (rc > 0) {
1770                         CERROR("%s: unexpected positive size %d\n",
1771                                obd_name, rc);
1772                         RETURN(-EPROTO);
1773                 }
1774
1775                 if (req->rq_bulk != NULL &&
1776                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1777                         RETURN(-EAGAIN);
1778
1779                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780                     check_write_checksum(&body->oa, peer, client_cksum,
1781                                          body->oa.o_cksum, aa))
1782                         RETURN(-EAGAIN);
1783
1784                 rc = check_write_rcs(req, aa->aa_requested_nob,
1785                                      aa->aa_nio_count, aa->aa_page_count,
1786                                      aa->aa_ppga);
1787                 GOTO(out, rc);
1788         }
1789
1790         /* The rest of this function executes only for OST_READs */
1791
1792         if (req->rq_bulk == NULL) {
1793                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1794                                           RCL_SERVER);
1795                 LASSERT(rc == req->rq_status);
1796         } else {
1797                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1798                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1799         }
1800         if (rc < 0)
1801                 GOTO(out, rc = -EAGAIN);
1802
1803         if (rc > aa->aa_requested_nob) {
1804                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1805                        rc, aa->aa_requested_nob);
1806                 RETURN(-EPROTO);
1807         }
1808
1809         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1810                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1811                        rc, req->rq_bulk->bd_nob_transferred);
1812                 RETURN(-EPROTO);
1813         }
1814
1815         if (req->rq_bulk == NULL) {
1816                 /* short io */
1817                 int nob, pg_count, i = 0;
1818                 unsigned char *buf;
1819
1820                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1821                 pg_count = aa->aa_page_count;
1822                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1823                                                    rc);
1824                 nob = rc;
1825                 while (nob > 0 && pg_count > 0) {
1826                         unsigned char *ptr;
1827                         int count = aa->aa_ppga[i]->count > nob ?
1828                                     nob : aa->aa_ppga[i]->count;
1829
1830                         CDEBUG(D_CACHE, "page %p count %d\n",
1831                                aa->aa_ppga[i]->pg, count);
1832                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1833                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1834                                count);
1835                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1836
1837                         buf += count;
1838                         nob -= count;
1839                         i++;
1840                         pg_count--;
1841                 }
1842         }
1843
1844         if (rc < aa->aa_requested_nob)
1845                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1846
1847         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1848                 static int cksum_counter;
1849                 u32        server_cksum = body->oa.o_cksum;
1850                 char      *via = "";
1851                 char      *router = "";
1852                 enum cksum_types cksum_type;
1853                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1854                         body->oa.o_flags : 0;
1855
1856                 cksum_type = obd_cksum_type_unpack(o_flags);
1857                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1858                                           aa->aa_page_count, aa->aa_ppga,
1859                                           OST_READ, &client_cksum);
1860                 if (rc < 0)
1861                         GOTO(out, rc);
1862
1863                 if (req->rq_bulk != NULL &&
1864                     peer->nid != req->rq_bulk->bd_sender) {
1865                         via = " via ";
1866                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1867                 }
1868
1869                 if (server_cksum != client_cksum) {
1870                         struct ost_body *clbody;
1871                         u32 page_count = aa->aa_page_count;
1872
1873                         clbody = req_capsule_client_get(&req->rq_pill,
1874                                                         &RMF_OST_BODY);
1875                         if (cli->cl_checksum_dump)
1876                                 dump_all_bulk_pages(&clbody->oa, page_count,
1877                                                     aa->aa_ppga, server_cksum,
1878                                                     client_cksum);
1879
1880                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1881                                            "%s%s%s inode "DFID" object "DOSTID
1882                                            " extent [%llu-%llu], client %x, "
1883                                            "server %x, cksum_type %x\n",
1884                                            obd_name,
1885                                            libcfs_nid2str(peer->nid),
1886                                            via, router,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_seq : 0ULL,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_oid : 0,
1891                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1892                                                 clbody->oa.o_parent_ver : 0,
1893                                            POSTID(&body->oa.o_oi),
1894                                            aa->aa_ppga[0]->off,
1895                                            aa->aa_ppga[page_count-1]->off +
1896                                            aa->aa_ppga[page_count-1]->count - 1,
1897                                            client_cksum, server_cksum,
1898                                            cksum_type);
1899                         cksum_counter = 0;
1900                         aa->aa_oa->o_cksum = client_cksum;
1901                         rc = -EAGAIN;
1902                 } else {
1903                         cksum_counter++;
1904                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1905                         rc = 0;
1906                 }
1907         } else if (unlikely(client_cksum)) {
1908                 static int cksum_missed;
1909
1910                 cksum_missed++;
1911                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1912                         CERROR("%s: checksum %u requested from %s but not sent\n",
1913                                obd_name, cksum_missed,
1914                                libcfs_nid2str(peer->nid));
1915         } else {
1916                 rc = 0;
1917         }
1918 out:
1919         if (rc >= 0)
1920                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1921                                      aa->aa_oa, &body->oa);
1922
1923         RETURN(rc);
1924 }
1925
1926 static int osc_brw_redo_request(struct ptlrpc_request *request,
1927                                 struct osc_brw_async_args *aa, int rc)
1928 {
1929         struct ptlrpc_request *new_req;
1930         struct osc_brw_async_args *new_aa;
1931         struct osc_async_page *oap;
1932         ENTRY;
1933
1934         /* The below message is checked in replay-ost-single.sh test_8ae*/
1935         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1936                   "redo for recoverable error %d", rc);
1937
1938         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1939                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1940                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1941                                   aa->aa_ppga, &new_req, 1);
1942         if (rc)
1943                 RETURN(rc);
1944
1945         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1946                 if (oap->oap_request != NULL) {
1947                         LASSERTF(request == oap->oap_request,
1948                                  "request %p != oap_request %p\n",
1949                                  request, oap->oap_request);
1950                         if (oap->oap_interrupted) {
1951                                 ptlrpc_req_finished(new_req);
1952                                 RETURN(-EINTR);
1953                         }
1954                 }
1955         }
1956         /*
1957          * New request takes over pga and oaps from old request.
1958          * Note that copying a list_head doesn't work, need to move it...
1959          */
1960         aa->aa_resends++;
1961         new_req->rq_interpret_reply = request->rq_interpret_reply;
1962         new_req->rq_async_args = request->rq_async_args;
1963         new_req->rq_commit_cb = request->rq_commit_cb;
1964         /* cap resend delay to the current request timeout, this is similar to
1965          * what ptlrpc does (see after_reply()) */
1966         if (aa->aa_resends > new_req->rq_timeout)
1967                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1968         else
1969                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1970         new_req->rq_generation_set = 1;
1971         new_req->rq_import_generation = request->rq_import_generation;
1972
1973         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1974
1975         INIT_LIST_HEAD(&new_aa->aa_oaps);
1976         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1977         INIT_LIST_HEAD(&new_aa->aa_exts);
1978         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1979         new_aa->aa_resends = aa->aa_resends;
1980
1981         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1982                 if (oap->oap_request) {
1983                         ptlrpc_req_finished(oap->oap_request);
1984                         oap->oap_request = ptlrpc_request_addref(new_req);
1985                 }
1986         }
1987
1988         /* XXX: This code will run into problem if we're going to support
1989          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1990          * and wait for all of them to be finished. We should inherit request
1991          * set from old request. */
1992         ptlrpcd_add_req(new_req);
1993
1994         DEBUG_REQ(D_INFO, new_req, "new request");
1995         RETURN(0);
1996 }
1997
1998 /*
1999  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2000  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2001  * fine for our small page arrays and doesn't require allocation.  its an
2002  * insertion sort that swaps elements that are strides apart, shrinking the
2003  * stride down until its '1' and the array is sorted.
2004  */
2005 static void sort_brw_pages(struct brw_page **array, int num)
2006 {
2007         int stride, i, j;
2008         struct brw_page *tmp;
2009
2010         if (num == 1)
2011                 return;
2012         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2013                 ;
2014
2015         do {
2016                 stride /= 3;
2017                 for (i = stride ; i < num ; i++) {
2018                         tmp = array[i];
2019                         j = i;
2020                         while (j >= stride && array[j - stride]->off > tmp->off) {
2021                                 array[j] = array[j - stride];
2022                                 j -= stride;
2023                         }
2024                         array[j] = tmp;
2025                 }
2026         } while (stride > 1);
2027 }
2028
2029 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2030 {
2031         LASSERT(ppga != NULL);
2032         OBD_FREE(ppga, sizeof(*ppga) * count);
2033 }
2034
2035 static int brw_interpret(const struct lu_env *env,
2036                          struct ptlrpc_request *req, void *args, int rc)
2037 {
2038         struct osc_brw_async_args *aa = args;
2039         struct osc_extent *ext;
2040         struct osc_extent *tmp;
2041         struct client_obd *cli = aa->aa_cli;
2042         unsigned long transferred = 0;
2043
2044         ENTRY;
2045
2046         rc = osc_brw_fini_request(req, rc);
2047         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2048         /*
2049          * When server returns -EINPROGRESS, client should always retry
2050          * regardless of the number of times the bulk was resent already.
2051          */
2052         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2053                 if (req->rq_import_generation !=
2054                     req->rq_import->imp_generation) {
2055                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2056                                ""DOSTID", rc = %d.\n",
2057                                req->rq_import->imp_obd->obd_name,
2058                                POSTID(&aa->aa_oa->o_oi), rc);
2059                 } else if (rc == -EINPROGRESS ||
2060                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2061                         rc = osc_brw_redo_request(req, aa, rc);
2062                 } else {
2063                         CERROR("%s: too many resent retries for object: "
2064                                "%llu:%llu, rc = %d.\n",
2065                                req->rq_import->imp_obd->obd_name,
2066                                POSTID(&aa->aa_oa->o_oi), rc);
2067                 }
2068
2069                 if (rc == 0)
2070                         RETURN(0);
2071                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2072                         rc = -EIO;
2073         }
2074
2075         if (rc == 0) {
2076                 struct obdo *oa = aa->aa_oa;
2077                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2078                 unsigned long valid = 0;
2079                 struct cl_object *obj;
2080                 struct osc_async_page *last;
2081
2082                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2083                 obj = osc2cl(last->oap_obj);
2084
2085                 cl_object_attr_lock(obj);
2086                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2087                         attr->cat_blocks = oa->o_blocks;
2088                         valid |= CAT_BLOCKS;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLMTIME) {
2091                         attr->cat_mtime = oa->o_mtime;
2092                         valid |= CAT_MTIME;
2093                 }
2094                 if (oa->o_valid & OBD_MD_FLATIME) {
2095                         attr->cat_atime = oa->o_atime;
2096                         valid |= CAT_ATIME;
2097                 }
2098                 if (oa->o_valid & OBD_MD_FLCTIME) {
2099                         attr->cat_ctime = oa->o_ctime;
2100                         valid |= CAT_CTIME;
2101                 }
2102
2103                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2104                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2105                         loff_t last_off = last->oap_count + last->oap_obj_off +
2106                                 last->oap_page_off;
2107
2108                         /* Change file size if this is an out of quota or
2109                          * direct IO write and it extends the file size */
2110                         if (loi->loi_lvb.lvb_size < last_off) {
2111                                 attr->cat_size = last_off;
2112                                 valid |= CAT_SIZE;
2113                         }
2114                         /* Extend KMS if it's not a lockless write */
2115                         if (loi->loi_kms < last_off &&
2116                             oap2osc_page(last)->ops_srvlock == 0) {
2117                                 attr->cat_kms = last_off;
2118                                 valid |= CAT_KMS;
2119                         }
2120                 }
2121
2122                 if (valid != 0)
2123                         cl_object_attr_update(env, obj, attr, valid);
2124                 cl_object_attr_unlock(obj);
2125         }
2126         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2127         aa->aa_oa = NULL;
2128
2129         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2130                 osc_inc_unstable_pages(req);
2131
2132         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2133                 list_del_init(&ext->oe_link);
2134                 osc_extent_finish(env, ext, 1,
2135                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2136         }
2137         LASSERT(list_empty(&aa->aa_exts));
2138         LASSERT(list_empty(&aa->aa_oaps));
2139
2140         transferred = (req->rq_bulk == NULL ? /* short io */
2141                        aa->aa_requested_nob :
2142                        req->rq_bulk->bd_nob_transferred);
2143
2144         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2145         ptlrpc_lprocfs_brw(req, transferred);
2146
2147         spin_lock(&cli->cl_loi_list_lock);
2148         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2149          * is called so we know whether to go to sync BRWs or wait for more
2150          * RPCs to complete */
2151         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2152                 cli->cl_w_in_flight--;
2153         else
2154                 cli->cl_r_in_flight--;
2155         osc_wake_cache_waiters(cli);
2156         spin_unlock(&cli->cl_loi_list_lock);
2157
2158         osc_io_unplug(env, cli, NULL);
2159         RETURN(rc);
2160 }
2161
2162 static void brw_commit(struct ptlrpc_request *req)
2163 {
2164         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2165          * this called via the rq_commit_cb, I need to ensure
2166          * osc_dec_unstable_pages is still called. Otherwise unstable
2167          * pages may be leaked. */
2168         spin_lock(&req->rq_lock);
2169         if (likely(req->rq_unstable)) {
2170                 req->rq_unstable = 0;
2171                 spin_unlock(&req->rq_lock);
2172
2173                 osc_dec_unstable_pages(req);
2174         } else {
2175                 req->rq_committed = 1;
2176                 spin_unlock(&req->rq_lock);
2177         }
2178 }
2179
2180 /**
2181  * Build an RPC by the list of extent @ext_list. The caller must ensure
2182  * that the total pages in this list are NOT over max pages per RPC.
2183  * Extents in the list must be in OES_RPC state.
2184  */
2185 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2186                   struct list_head *ext_list, int cmd)
2187 {
2188         struct ptlrpc_request           *req = NULL;
2189         struct osc_extent               *ext;
2190         struct brw_page                 **pga = NULL;
2191         struct osc_brw_async_args       *aa = NULL;
2192         struct obdo                     *oa = NULL;
2193         struct osc_async_page           *oap;
2194         struct osc_object               *obj = NULL;
2195         struct cl_req_attr              *crattr = NULL;
2196         loff_t                          starting_offset = OBD_OBJECT_EOF;
2197         loff_t                          ending_offset = 0;
2198         int                             mpflag = 0;
2199         int                             mem_tight = 0;
2200         int                             page_count = 0;
2201         bool                            soft_sync = false;
2202         bool                            interrupted = false;
2203         bool                            ndelay = false;
2204         int                             i;
2205         int                             grant = 0;
2206         int                             rc;
2207         __u32                           layout_version = 0;
2208         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2209         struct ost_body                 *body;
2210         ENTRY;
2211         LASSERT(!list_empty(ext_list));
2212
2213         /* add pages into rpc_list to build BRW rpc */
2214         list_for_each_entry(ext, ext_list, oe_link) {
2215                 LASSERT(ext->oe_state == OES_RPC);
2216                 mem_tight |= ext->oe_memalloc;
2217                 grant += ext->oe_grants;
2218                 page_count += ext->oe_nr_pages;
2219                 layout_version = MAX(layout_version, ext->oe_layout_version);
2220                 if (obj == NULL)
2221                         obj = ext->oe_obj;
2222         }
2223
2224         soft_sync = osc_over_unstable_soft_limit(cli);
2225         if (mem_tight)
2226                 mpflag = cfs_memory_pressure_get_and_set();
2227
2228         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2229         if (pga == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2233         if (oa == NULL)
2234                 GOTO(out, rc = -ENOMEM);
2235
2236         i = 0;
2237         list_for_each_entry(ext, ext_list, oe_link) {
2238                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2239                         if (mem_tight)
2240                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2241                         if (soft_sync)
2242                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2243                         pga[i] = &oap->oap_brw_page;
2244                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2245                         i++;
2246
2247                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2248                         if (starting_offset == OBD_OBJECT_EOF ||
2249                             starting_offset > oap->oap_obj_off)
2250                                 starting_offset = oap->oap_obj_off;
2251                         else
2252                                 LASSERT(oap->oap_page_off == 0);
2253                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2254                                 ending_offset = oap->oap_obj_off +
2255                                                 oap->oap_count;
2256                         else
2257                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2258                                         PAGE_SIZE);
2259                         if (oap->oap_interrupted)
2260                                 interrupted = true;
2261                 }
2262                 if (ext->oe_ndelay)
2263                         ndelay = true;
2264         }
2265
2266         /* first page in the list */
2267         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2268
2269         crattr = &osc_env_info(env)->oti_req_attr;
2270         memset(crattr, 0, sizeof(*crattr));
2271         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2272         crattr->cra_flags = ~0ULL;
2273         crattr->cra_page = oap2cl_page(oap);
2274         crattr->cra_oa = oa;
2275         cl_req_attr_set(env, osc2cl(obj), crattr);
2276
2277         if (cmd == OBD_BRW_WRITE) {
2278                 oa->o_grant_used = grant;
2279                 if (layout_version > 0) {
2280                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2281                                PFID(&oa->o_oi.oi_fid), layout_version);
2282
2283                         oa->o_layout_version = layout_version;
2284                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2285                 }
2286         }
2287
2288         sort_brw_pages(pga, page_count);
2289         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2290         if (rc != 0) {
2291                 CERROR("prep_req failed: %d\n", rc);
2292                 GOTO(out, rc);
2293         }
2294
2295         req->rq_commit_cb = brw_commit;
2296         req->rq_interpret_reply = brw_interpret;
2297         req->rq_memalloc = mem_tight != 0;
2298         oap->oap_request = ptlrpc_request_addref(req);
2299         if (interrupted && !req->rq_intr)
2300                 ptlrpc_mark_interrupted(req);
2301         if (ndelay) {
2302                 req->rq_no_resend = req->rq_no_delay = 1;
2303                 /* probably set a shorter timeout value.
2304                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2305                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2306         }
2307
2308         /* Need to update the timestamps after the request is built in case
2309          * we race with setattr (locally or in queue at OST).  If OST gets
2310          * later setattr before earlier BRW (as determined by the request xid),
2311          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2312          * way to do this in a single call.  bug 10150 */
2313         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2314         crattr->cra_oa = &body->oa;
2315         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2316         cl_req_attr_set(env, osc2cl(obj), crattr);
2317         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2318
2319         aa = ptlrpc_req_async_args(aa, req);
2320         INIT_LIST_HEAD(&aa->aa_oaps);
2321         list_splice_init(&rpc_list, &aa->aa_oaps);
2322         INIT_LIST_HEAD(&aa->aa_exts);
2323         list_splice_init(ext_list, &aa->aa_exts);
2324
2325         spin_lock(&cli->cl_loi_list_lock);
2326         starting_offset >>= PAGE_SHIFT;
2327         if (cmd == OBD_BRW_READ) {
2328                 cli->cl_r_in_flight++;
2329                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2330                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2331                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2332                                       starting_offset + 1);
2333         } else {
2334                 cli->cl_w_in_flight++;
2335                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2336                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2337                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2338                                       starting_offset + 1);
2339         }
2340         spin_unlock(&cli->cl_loi_list_lock);
2341
2342         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2343                   page_count, aa, cli->cl_r_in_flight,
2344                   cli->cl_w_in_flight);
2345         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2346
2347         ptlrpcd_add_req(req);
2348         rc = 0;
2349         EXIT;
2350
2351 out:
2352         if (mem_tight != 0)
2353                 cfs_memory_pressure_restore(mpflag);
2354
2355         if (rc != 0) {
2356                 LASSERT(req == NULL);
2357
2358                 if (oa)
2359                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2360                 if (pga)
2361                         OBD_FREE(pga, sizeof(*pga) * page_count);
2362                 /* this should happen rarely and is pretty bad, it makes the
2363                  * pending list not follow the dirty order */
2364                 while (!list_empty(ext_list)) {
2365                         ext = list_entry(ext_list->next, struct osc_extent,
2366                                          oe_link);
2367                         list_del_init(&ext->oe_link);
2368                         osc_extent_finish(env, ext, 0, rc);
2369                 }
2370         }
2371         RETURN(rc);
2372 }
2373
2374 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2375 {
2376         int set = 0;
2377
2378         LASSERT(lock != NULL);
2379
2380         lock_res_and_lock(lock);
2381
2382         if (lock->l_ast_data == NULL)
2383                 lock->l_ast_data = data;
2384         if (lock->l_ast_data == data)
2385                 set = 1;
2386
2387         unlock_res_and_lock(lock);
2388
2389         return set;
2390 }
2391
2392 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2393                      void *cookie, struct lustre_handle *lockh,
2394                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2395                      int errcode)
2396 {
2397         bool intent = *flags & LDLM_FL_HAS_INTENT;
2398         int rc;
2399         ENTRY;
2400
2401         /* The request was created before ldlm_cli_enqueue call. */
2402         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2403                 struct ldlm_reply *rep;
2404
2405                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2406                 LASSERT(rep != NULL);
2407
2408                 rep->lock_policy_res1 =
2409                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2410                 if (rep->lock_policy_res1)
2411                         errcode = rep->lock_policy_res1;
2412                 if (!speculative)
2413                         *flags |= LDLM_FL_LVB_READY;
2414         } else if (errcode == ELDLM_OK) {
2415                 *flags |= LDLM_FL_LVB_READY;
2416         }
2417
2418         /* Call the update callback. */
2419         rc = (*upcall)(cookie, lockh, errcode);
2420
2421         /* release the reference taken in ldlm_cli_enqueue() */
2422         if (errcode == ELDLM_LOCK_MATCHED)
2423                 errcode = ELDLM_OK;
2424         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2425                 ldlm_lock_decref(lockh, mode);
2426
2427         RETURN(rc);
2428 }
2429
2430 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2431                           void *args, int rc)
2432 {
2433         struct osc_enqueue_args *aa = args;
2434         struct ldlm_lock *lock;
2435         struct lustre_handle *lockh = &aa->oa_lockh;
2436         enum ldlm_mode mode = aa->oa_mode;
2437         struct ost_lvb *lvb = aa->oa_lvb;
2438         __u32 lvb_len = sizeof(*lvb);
2439         __u64 flags = 0;
2440
2441         ENTRY;
2442
2443         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2444          * be valid. */
2445         lock = ldlm_handle2lock(lockh);
2446         LASSERTF(lock != NULL,
2447                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2448                  lockh->cookie, req, aa);
2449
2450         /* Take an additional reference so that a blocking AST that
2451          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2452          * to arrive after an upcall has been executed by
2453          * osc_enqueue_fini(). */
2454         ldlm_lock_addref(lockh, mode);
2455
2456         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2457         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2458
2459         /* Let CP AST to grant the lock first. */
2460         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2461
2462         if (aa->oa_speculative) {
2463                 LASSERT(aa->oa_lvb == NULL);
2464                 LASSERT(aa->oa_flags == NULL);
2465                 aa->oa_flags = &flags;
2466         }
2467
2468         /* Complete obtaining the lock procedure. */
2469         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2470                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2471                                    lockh, rc);
2472         /* Complete osc stuff. */
2473         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2474                               aa->oa_flags, aa->oa_speculative, rc);
2475
2476         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2477
2478         ldlm_lock_decref(lockh, mode);
2479         LDLM_LOCK_PUT(lock);
2480         RETURN(rc);
2481 }
2482
2483 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2484
2485 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2486  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2487  * other synchronous requests, however keeping some locks and trying to obtain
2488  * others may take a considerable amount of time in a case of ost failure; and
2489  * when other sync requests do not get released lock from a client, the client
2490  * is evicted from the cluster -- such scenarious make the life difficult, so
2491  * release locks just after they are obtained. */
2492 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2493                      __u64 *flags, union ldlm_policy_data *policy,
2494                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2495                      void *cookie, struct ldlm_enqueue_info *einfo,
2496                      struct ptlrpc_request_set *rqset, int async,
2497                      bool speculative)
2498 {
2499         struct obd_device *obd = exp->exp_obd;
2500         struct lustre_handle lockh = { 0 };
2501         struct ptlrpc_request *req = NULL;
2502         int intent = *flags & LDLM_FL_HAS_INTENT;
2503         __u64 match_flags = *flags;
2504         enum ldlm_mode mode;
2505         int rc;
2506         ENTRY;
2507
2508         /* Filesystem lock extents are extended to page boundaries so that
2509          * dealing with the page cache is a little smoother.  */
2510         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2511         policy->l_extent.end |= ~PAGE_MASK;
2512
2513         /* Next, search for already existing extent locks that will cover us */
2514         /* If we're trying to read, we also search for an existing PW lock.  The
2515          * VFS and page cache already protect us locally, so lots of readers/
2516          * writers can share a single PW lock.
2517          *
2518          * There are problems with conversion deadlocks, so instead of
2519          * converting a read lock to a write lock, we'll just enqueue a new
2520          * one.
2521          *
2522          * At some point we should cancel the read lock instead of making them
2523          * send us a blocking callback, but there are problems with canceling
2524          * locks out from other users right now, too. */
2525         mode = einfo->ei_mode;
2526         if (einfo->ei_mode == LCK_PR)
2527                 mode |= LCK_PW;
2528         /* Normal lock requests must wait for the LVB to be ready before
2529          * matching a lock; speculative lock requests do not need to,
2530          * because they will not actually use the lock. */
2531         if (!speculative)
2532                 match_flags |= LDLM_FL_LVB_READY;
2533         if (intent != 0)
2534                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2535         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2536                                einfo->ei_type, policy, mode, &lockh, 0);
2537         if (mode) {
2538                 struct ldlm_lock *matched;
2539
2540                 if (*flags & LDLM_FL_TEST_LOCK)
2541                         RETURN(ELDLM_OK);
2542
2543                 matched = ldlm_handle2lock(&lockh);
2544                 if (speculative) {
2545                         /* This DLM lock request is speculative, and does not
2546                          * have an associated IO request. Therefore if there
2547                          * is already a DLM lock, it wll just inform the
2548                          * caller to cancel the request for this stripe.*/
2549                         lock_res_and_lock(matched);
2550                         if (ldlm_extent_equal(&policy->l_extent,
2551                             &matched->l_policy_data.l_extent))
2552                                 rc = -EEXIST;
2553                         else
2554                                 rc = -ECANCELED;
2555                         unlock_res_and_lock(matched);
2556
2557                         ldlm_lock_decref(&lockh, mode);
2558                         LDLM_LOCK_PUT(matched);
2559                         RETURN(rc);
2560                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2561                         *flags |= LDLM_FL_LVB_READY;
2562
2563                         /* We already have a lock, and it's referenced. */
2564                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2565
2566                         ldlm_lock_decref(&lockh, mode);
2567                         LDLM_LOCK_PUT(matched);
2568                         RETURN(ELDLM_OK);
2569                 } else {
2570                         ldlm_lock_decref(&lockh, mode);
2571                         LDLM_LOCK_PUT(matched);
2572                 }
2573         }
2574
2575         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2576                 RETURN(-ENOLCK);
2577
2578         if (intent) {
2579                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2580                                            &RQF_LDLM_ENQUEUE_LVB);
2581                 if (req == NULL)
2582                         RETURN(-ENOMEM);
2583
2584                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2585                 if (rc) {
2586                         ptlrpc_request_free(req);
2587                         RETURN(rc);
2588                 }
2589
2590                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2591                                      sizeof *lvb);
2592                 ptlrpc_request_set_replen(req);
2593         }
2594
2595         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2596         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2597
2598         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2599                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2600         if (async) {
2601                 if (!rc) {
2602                         struct osc_enqueue_args *aa;
2603                         aa = ptlrpc_req_async_args(aa, req);
2604                         aa->oa_exp         = exp;
2605                         aa->oa_mode        = einfo->ei_mode;
2606                         aa->oa_type        = einfo->ei_type;
2607                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2608                         aa->oa_upcall      = upcall;
2609                         aa->oa_cookie      = cookie;
2610                         aa->oa_speculative = speculative;
2611                         if (!speculative) {
2612                                 aa->oa_flags  = flags;
2613                                 aa->oa_lvb    = lvb;
2614                         } else {
2615                                 /* speculative locks are essentially to enqueue
2616                                  * a DLM lock  in advance, so we don't care
2617                                  * about the result of the enqueue. */
2618                                 aa->oa_lvb    = NULL;
2619                                 aa->oa_flags  = NULL;
2620                         }
2621
2622                         req->rq_interpret_reply = osc_enqueue_interpret;
2623                         if (rqset == PTLRPCD_SET)
2624                                 ptlrpcd_add_req(req);
2625                         else
2626                                 ptlrpc_set_add_req(rqset, req);
2627                 } else if (intent) {
2628                         ptlrpc_req_finished(req);
2629                 }
2630                 RETURN(rc);
2631         }
2632
2633         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2634                               flags, speculative, rc);
2635         if (intent)
2636                 ptlrpc_req_finished(req);
2637
2638         RETURN(rc);
2639 }
2640
2641 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2642                    struct ldlm_res_id *res_id, enum ldlm_type type,
2643                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2644                    __u64 *flags, struct osc_object *obj,
2645                    struct lustre_handle *lockh, int unref)
2646 {
2647         struct obd_device *obd = exp->exp_obd;
2648         __u64 lflags = *flags;
2649         enum ldlm_mode rc;
2650         ENTRY;
2651
2652         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2653                 RETURN(-EIO);
2654
2655         /* Filesystem lock extents are extended to page boundaries so that
2656          * dealing with the page cache is a little smoother */
2657         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2658         policy->l_extent.end |= ~PAGE_MASK;
2659
2660         /* Next, search for already existing extent locks that will cover us */
2661         /* If we're trying to read, we also search for an existing PW lock.  The
2662          * VFS and page cache already protect us locally, so lots of readers/
2663          * writers can share a single PW lock. */
2664         rc = mode;
2665         if (mode == LCK_PR)
2666                 rc |= LCK_PW;
2667         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2668                              res_id, type, policy, rc, lockh, unref);
2669         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2670                 RETURN(rc);
2671
2672         if (obj != NULL) {
2673                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2674
2675                 LASSERT(lock != NULL);
2676                 if (osc_set_lock_data(lock, obj)) {
2677                         lock_res_and_lock(lock);
2678                         if (!ldlm_is_lvb_cached(lock)) {
2679                                 LASSERT(lock->l_ast_data == obj);
2680                                 osc_lock_lvb_update(env, obj, lock, NULL);
2681                                 ldlm_set_lvb_cached(lock);
2682                         }
2683                         unlock_res_and_lock(lock);
2684                 } else {
2685                         ldlm_lock_decref(lockh, rc);
2686                         rc = 0;
2687                 }
2688                 LDLM_LOCK_PUT(lock);
2689         }
2690         RETURN(rc);
2691 }
2692
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694                                 struct ptlrpc_request *req, void *args, int rc)
2695 {
2696         struct osc_async_args *aa = args;
2697         struct obd_statfs *msfs;
2698
2699         ENTRY;
2700         if (rc == -EBADR)
2701                 /*
2702                  * The request has in fact never been sent due to issues at
2703                  * a higher level (LOV).  Exit immediately since the caller
2704                  * is aware of the problem and takes care of the clean up.
2705                  */
2706                 RETURN(rc);
2707
2708         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2709             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2710                 GOTO(out, rc = 0);
2711
2712         if (rc != 0)
2713                 GOTO(out, rc);
2714
2715         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2716         if (msfs == NULL)
2717                 GOTO(out, rc = -EPROTO);
2718
2719         *aa->aa_oi->oi_osfs = *msfs;
2720 out:
2721         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2722
2723         RETURN(rc);
2724 }
2725
2726 static int osc_statfs_async(struct obd_export *exp,
2727                             struct obd_info *oinfo, time64_t max_age,
2728                             struct ptlrpc_request_set *rqset)
2729 {
2730         struct obd_device     *obd = class_exp2obd(exp);
2731         struct ptlrpc_request *req;
2732         struct osc_async_args *aa;
2733         int rc;
2734         ENTRY;
2735
2736         if (obd->obd_osfs_age >= max_age) {
2737                 CDEBUG(D_SUPER,
2738                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2739                        obd->obd_name, &obd->obd_osfs,
2740                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2741                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2742                 spin_lock(&obd->obd_osfs_lock);
2743                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2744                 spin_unlock(&obd->obd_osfs_lock);
2745                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2746                 if (oinfo->oi_cb_up)
2747                         oinfo->oi_cb_up(oinfo, 0);
2748
2749                 RETURN(0);
2750         }
2751
2752         /* We could possibly pass max_age in the request (as an absolute
2753          * timestamp or a "seconds.usec ago") so the target can avoid doing
2754          * extra calls into the filesystem if that isn't necessary (e.g.
2755          * during mount that would help a bit).  Having relative timestamps
2756          * is not so great if request processing is slow, while absolute
2757          * timestamps are not ideal because they need time synchronization. */
2758         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2759         if (req == NULL)
2760                 RETURN(-ENOMEM);
2761
2762         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2763         if (rc) {
2764                 ptlrpc_request_free(req);
2765                 RETURN(rc);
2766         }
2767         ptlrpc_request_set_replen(req);
2768         req->rq_request_portal = OST_CREATE_PORTAL;
2769         ptlrpc_at_set_req_timeout(req);
2770
2771         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2772                 /* procfs requests not want stat in wait for avoid deadlock */
2773                 req->rq_no_resend = 1;
2774                 req->rq_no_delay = 1;
2775         }
2776
2777         req->rq_interpret_reply = osc_statfs_interpret;
2778         aa = ptlrpc_req_async_args(aa, req);
2779         aa->aa_oi = oinfo;
2780
2781         ptlrpc_set_add_req(rqset, req);
2782         RETURN(0);
2783 }
2784
2785 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2786                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2787 {
2788         struct obd_device     *obd = class_exp2obd(exp);
2789         struct obd_statfs     *msfs;
2790         struct ptlrpc_request *req;
2791         struct obd_import     *imp = NULL;
2792         int rc;
2793         ENTRY;
2794
2795
2796         /*Since the request might also come from lprocfs, so we need
2797          *sync this with client_disconnect_export Bug15684*/
2798         down_read(&obd->u.cli.cl_sem);
2799         if (obd->u.cli.cl_import)
2800                 imp = class_import_get(obd->u.cli.cl_import);
2801         up_read(&obd->u.cli.cl_sem);
2802         if (!imp)
2803                 RETURN(-ENODEV);
2804
2805         /* We could possibly pass max_age in the request (as an absolute
2806          * timestamp or a "seconds.usec ago") so the target can avoid doing
2807          * extra calls into the filesystem if that isn't necessary (e.g.
2808          * during mount that would help a bit).  Having relative timestamps
2809          * is not so great if request processing is slow, while absolute
2810          * timestamps are not ideal because they need time synchronization. */
2811         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2812
2813         class_import_put(imp);
2814
2815         if (req == NULL)
2816                 RETURN(-ENOMEM);
2817
2818         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2819         if (rc) {
2820                 ptlrpc_request_free(req);
2821                 RETURN(rc);
2822         }
2823         ptlrpc_request_set_replen(req);
2824         req->rq_request_portal = OST_CREATE_PORTAL;
2825         ptlrpc_at_set_req_timeout(req);
2826
2827         if (flags & OBD_STATFS_NODELAY) {
2828                 /* procfs requests not want stat in wait for avoid deadlock */
2829                 req->rq_no_resend = 1;
2830                 req->rq_no_delay = 1;
2831         }
2832
2833         rc = ptlrpc_queue_wait(req);
2834         if (rc)
2835                 GOTO(out, rc);
2836
2837         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2838         if (msfs == NULL)
2839                 GOTO(out, rc = -EPROTO);
2840
2841         *osfs = *msfs;
2842
2843         EXIT;
2844 out:
2845         ptlrpc_req_finished(req);
2846         return rc;
2847 }
2848
2849 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2850                          void *karg, void __user *uarg)
2851 {
2852         struct obd_device *obd = exp->exp_obd;
2853         struct obd_ioctl_data *data = karg;
2854         int rc = 0;
2855
2856         ENTRY;
2857         if (!try_module_get(THIS_MODULE)) {
2858                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2859                        module_name(THIS_MODULE));
2860                 return -EINVAL;
2861         }
2862         switch (cmd) {
2863         case OBD_IOC_CLIENT_RECOVER:
2864                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2865                                            data->ioc_inlbuf1, 0);
2866                 if (rc > 0)
2867                         rc = 0;
2868                 break;
2869         case IOC_OSC_SET_ACTIVE:
2870                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2871                                               data->ioc_offset);
2872                 break;
2873         default:
2874                 rc = -ENOTTY;
2875                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2876                        obd->obd_name, cmd, current_comm(), rc);
2877                 break;
2878         }
2879
2880         module_put(THIS_MODULE);
2881         return rc;
2882 }
2883
2884 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2885                        u32 keylen, void *key, u32 vallen, void *val,
2886                        struct ptlrpc_request_set *set)
2887 {
2888         struct ptlrpc_request *req;
2889         struct obd_device     *obd = exp->exp_obd;
2890         struct obd_import     *imp = class_exp2cliimp(exp);
2891         char                  *tmp;
2892         int                    rc;
2893         ENTRY;
2894
2895         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2896
2897         if (KEY_IS(KEY_CHECKSUM)) {
2898                 if (vallen != sizeof(int))
2899                         RETURN(-EINVAL);
2900                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2901                 RETURN(0);
2902         }
2903
2904         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2905                 sptlrpc_conf_client_adapt(obd);
2906                 RETURN(0);
2907         }
2908
2909         if (KEY_IS(KEY_FLUSH_CTX)) {
2910                 sptlrpc_import_flush_my_ctx(imp);
2911                 RETURN(0);
2912         }
2913
2914         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2915                 struct client_obd *cli = &obd->u.cli;
2916                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2917                 long target = *(long *)val;
2918
2919                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2920                 *(long *)val -= nr;
2921                 RETURN(0);
2922         }
2923
2924         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2925                 RETURN(-EINVAL);
2926
2927         /* We pass all other commands directly to OST. Since nobody calls osc
2928            methods directly and everybody is supposed to go through LOV, we
2929            assume lov checked invalid values for us.
2930            The only recognised values so far are evict_by_nid and mds_conn.
2931            Even if something bad goes through, we'd get a -EINVAL from OST
2932            anyway. */
2933
2934         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2935                                                 &RQF_OST_SET_GRANT_INFO :
2936                                                 &RQF_OBD_SET_INFO);
2937         if (req == NULL)
2938                 RETURN(-ENOMEM);
2939
2940         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2941                              RCL_CLIENT, keylen);
2942         if (!KEY_IS(KEY_GRANT_SHRINK))
2943                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2944                                      RCL_CLIENT, vallen);
2945         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2946         if (rc) {
2947                 ptlrpc_request_free(req);
2948                 RETURN(rc);
2949         }
2950
2951         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2952         memcpy(tmp, key, keylen);
2953         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2954                                                         &RMF_OST_BODY :
2955                                                         &RMF_SETINFO_VAL);
2956         memcpy(tmp, val, vallen);
2957
2958         if (KEY_IS(KEY_GRANT_SHRINK)) {
2959                 struct osc_grant_args *aa;
2960                 struct obdo *oa;
2961
2962                 aa = ptlrpc_req_async_args(aa, req);
2963                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2964                 if (!oa) {
2965                         ptlrpc_req_finished(req);
2966                         RETURN(-ENOMEM);
2967                 }
2968                 *oa = ((struct ost_body *)val)->oa;
2969                 aa->aa_oa = oa;
2970                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2971         }
2972
2973         ptlrpc_request_set_replen(req);
2974         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2975                 LASSERT(set != NULL);
2976                 ptlrpc_set_add_req(set, req);
2977                 ptlrpc_check_set(NULL, set);
2978         } else {
2979                 ptlrpcd_add_req(req);
2980         }
2981
2982         RETURN(0);
2983 }
2984 EXPORT_SYMBOL(osc_set_info_async);
2985
2986 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2987                   struct obd_device *obd, struct obd_uuid *cluuid,
2988                   struct obd_connect_data *data, void *localdata)
2989 {
2990         struct client_obd *cli = &obd->u.cli;
2991
2992         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2993                 long lost_grant;
2994                 long grant;
2995
2996                 spin_lock(&cli->cl_loi_list_lock);
2997                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2998                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2999                         /* restore ocd_grant_blkbits as client page bits */
3000                         data->ocd_grant_blkbits = PAGE_SHIFT;
3001                         grant += cli->cl_dirty_grant;
3002                 } else {
3003                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3004                 }
3005                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3006                 lost_grant = cli->cl_lost_grant;
3007                 cli->cl_lost_grant = 0;
3008                 spin_unlock(&cli->cl_loi_list_lock);
3009
3010                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3011                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3012                        data->ocd_version, data->ocd_grant, lost_grant);
3013         }
3014
3015         RETURN(0);
3016 }
3017 EXPORT_SYMBOL(osc_reconnect);
3018
3019 int osc_disconnect(struct obd_export *exp)
3020 {
3021         struct obd_device *obd = class_exp2obd(exp);
3022         int rc;
3023
3024         rc = client_disconnect_export(exp);
3025         /**
3026          * Initially we put del_shrink_grant before disconnect_export, but it
3027          * causes the following problem if setup (connect) and cleanup
3028          * (disconnect) are tangled together.
3029          *      connect p1                     disconnect p2
3030          *   ptlrpc_connect_import
3031          *     ...............               class_manual_cleanup
3032          *                                     osc_disconnect
3033          *                                     del_shrink_grant
3034          *   ptlrpc_connect_interrupt
3035          *     osc_init_grant
3036          *   add this client to shrink list
3037          *                                      cleanup_osc
3038          * Bang! grant shrink thread trigger the shrink. BUG18662
3039          */
3040         osc_del_grant_list(&obd->u.cli);
3041         return rc;
3042 }
3043 EXPORT_SYMBOL(osc_disconnect);
3044
3045 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3046                                  struct hlist_node *hnode, void *arg)
3047 {
3048         struct lu_env *env = arg;
3049         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3050         struct ldlm_lock *lock;
3051         struct osc_object *osc = NULL;
3052         ENTRY;
3053
3054         lock_res(res);
3055         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3056                 if (lock->l_ast_data != NULL && osc == NULL) {
3057                         osc = lock->l_ast_data;
3058                         cl_object_get(osc2cl(osc));
3059                 }
3060
3061                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3062                  * by the 2nd round of ldlm_namespace_clean() call in
3063                  * osc_import_event(). */
3064                 ldlm_clear_cleaned(lock);
3065         }
3066         unlock_res(res);
3067
3068         if (osc != NULL) {
3069                 osc_object_invalidate(env, osc);
3070                 cl_object_put(env, osc2cl(osc));
3071         }
3072
3073         RETURN(0);
3074 }
3075 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3076
3077 static int osc_import_event(struct obd_device *obd,
3078                             struct obd_import *imp,
3079                             enum obd_import_event event)
3080 {
3081         struct client_obd *cli;
3082         int rc = 0;
3083
3084         ENTRY;
3085         LASSERT(imp->imp_obd == obd);
3086
3087         switch (event) {
3088         case IMP_EVENT_DISCON: {
3089                 cli = &obd->u.cli;
3090                 spin_lock(&cli->cl_loi_list_lock);
3091                 cli->cl_avail_grant = 0;
3092                 cli->cl_lost_grant = 0;
3093                 spin_unlock(&cli->cl_loi_list_lock);
3094                 break;
3095         }
3096         case IMP_EVENT_INACTIVE: {
3097                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3098                 break;
3099         }
3100         case IMP_EVENT_INVALIDATE: {
3101                 struct ldlm_namespace *ns = obd->obd_namespace;
3102                 struct lu_env         *env;
3103                 __u16                  refcheck;
3104
3105                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3106
3107                 env = cl_env_get(&refcheck);
3108                 if (!IS_ERR(env)) {
3109                         osc_io_unplug(env, &obd->u.cli, NULL);
3110
3111                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3112                                                  osc_ldlm_resource_invalidate,
3113                                                  env, 0);
3114                         cl_env_put(env, &refcheck);
3115
3116                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3117                 } else
3118                         rc = PTR_ERR(env);
3119                 break;
3120         }
3121         case IMP_EVENT_ACTIVE: {
3122                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3123                 break;
3124         }
3125         case IMP_EVENT_OCD: {
3126                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3127
3128                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3129                         osc_init_grant(&obd->u.cli, ocd);
3130
3131                 /* See bug 7198 */
3132                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3133                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3134
3135                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3136                 break;
3137         }
3138         case IMP_EVENT_DEACTIVATE: {
3139                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3140                 break;
3141         }
3142         case IMP_EVENT_ACTIVATE: {
3143                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3144                 break;
3145         }
3146         default:
3147                 CERROR("Unknown import event %d\n", event);
3148                 LBUG();
3149         }
3150         RETURN(rc);
3151 }
3152
3153 /**
3154  * Determine whether the lock can be canceled before replaying the lock
3155  * during recovery, see bug16774 for detailed information.
3156  *
3157  * \retval zero the lock can't be canceled
3158  * \retval other ok to cancel
3159  */
3160 static int osc_cancel_weight(struct ldlm_lock *lock)
3161 {
3162         /*
3163          * Cancel all unused and granted extent lock.
3164          */
3165         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3166             ldlm_is_granted(lock) &&
3167             osc_ldlm_weigh_ast(lock) == 0)
3168                 RETURN(1);
3169
3170         RETURN(0);
3171 }
3172
3173 static int brw_queue_work(const struct lu_env *env, void *data)
3174 {
3175         struct client_obd *cli = data;
3176
3177         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3178
3179         osc_io_unplug(env, cli, NULL);
3180         RETURN(0);
3181 }
3182
3183 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3184 {
3185         struct client_obd *cli = &obd->u.cli;
3186         void *handler;
3187         int rc;
3188
3189         ENTRY;
3190
3191         rc = ptlrpcd_addref();
3192         if (rc)
3193                 RETURN(rc);
3194
3195         rc = client_obd_setup(obd, lcfg);
3196         if (rc)
3197                 GOTO(out_ptlrpcd, rc);
3198
3199
3200         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3201         if (IS_ERR(handler))
3202                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3203         cli->cl_writeback_work = handler;
3204
3205         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3206         if (IS_ERR(handler))
3207                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3208         cli->cl_lru_work = handler;
3209
3210         rc = osc_quota_setup(obd);
3211         if (rc)
3212                 GOTO(out_ptlrpcd_work, rc);
3213
3214         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3215         osc_update_next_shrink(cli);
3216
3217         RETURN(rc);
3218
3219 out_ptlrpcd_work:
3220         if (cli->cl_writeback_work != NULL) {
3221                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3222                 cli->cl_writeback_work = NULL;
3223         }
3224         if (cli->cl_lru_work != NULL) {
3225                 ptlrpcd_destroy_work(cli->cl_lru_work);
3226                 cli->cl_lru_work = NULL;
3227         }
3228         client_obd_cleanup(obd);
3229 out_ptlrpcd:
3230         ptlrpcd_decref();
3231         RETURN(rc);
3232 }
3233 EXPORT_SYMBOL(osc_setup_common);
3234
3235 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3236 {
3237         struct client_obd *cli = &obd->u.cli;
3238         int                adding;
3239         int                added;
3240         int                req_count;
3241         int                rc;
3242
3243         ENTRY;
3244
3245         rc = osc_setup_common(obd, lcfg);
3246         if (rc < 0)
3247                 RETURN(rc);
3248
3249         rc = osc_tunables_init(obd);
3250         if (rc)
3251                 RETURN(rc);
3252
3253         /*
3254          * We try to control the total number of requests with a upper limit
3255          * osc_reqpool_maxreqcount. There might be some race which will cause
3256          * over-limit allocation, but it is fine.
3257          */
3258         req_count = atomic_read(&osc_pool_req_count);
3259         if (req_count < osc_reqpool_maxreqcount) {
3260                 adding = cli->cl_max_rpcs_in_flight + 2;
3261                 if (req_count + adding > osc_reqpool_maxreqcount)
3262                         adding = osc_reqpool_maxreqcount - req_count;
3263
3264                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3265                 atomic_add(added, &osc_pool_req_count);
3266         }
3267
3268         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3269
3270         spin_lock(&osc_shrink_lock);
3271         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3272         spin_unlock(&osc_shrink_lock);
3273         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3274         cli->cl_import->imp_idle_debug = D_HA;
3275
3276         RETURN(0);
3277 }
3278
3279 int osc_precleanup_common(struct obd_device *obd)
3280 {
3281         struct client_obd *cli = &obd->u.cli;
3282         ENTRY;
3283
3284         /* LU-464
3285          * for echo client, export may be on zombie list, wait for
3286          * zombie thread to cull it, because cli.cl_import will be
3287          * cleared in client_disconnect_export():
3288          *   class_export_destroy() -> obd_cleanup() ->
3289          *   echo_device_free() -> echo_client_cleanup() ->
3290          *   obd_disconnect() -> osc_disconnect() ->
3291          *   client_disconnect_export()
3292          */
3293         obd_zombie_barrier();
3294         if (cli->cl_writeback_work) {
3295                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3296                 cli->cl_writeback_work = NULL;
3297         }
3298
3299         if (cli->cl_lru_work) {
3300                 ptlrpcd_destroy_work(cli->cl_lru_work);
3301                 cli->cl_lru_work = NULL;
3302         }
3303
3304         obd_cleanup_client_import(obd);
3305         RETURN(0);
3306 }
3307 EXPORT_SYMBOL(osc_precleanup_common);
3308
3309 static int osc_precleanup(struct obd_device *obd)
3310 {
3311         ENTRY;
3312
3313         osc_precleanup_common(obd);
3314
3315         ptlrpc_lprocfs_unregister_obd(obd);
3316         RETURN(0);
3317 }
3318
3319 int osc_cleanup_common(struct obd_device *obd)
3320 {
3321         struct client_obd *cli = &obd->u.cli;
3322         int rc;
3323
3324         ENTRY;
3325
3326         spin_lock(&osc_shrink_lock);
3327         list_del(&cli->cl_shrink_list);
3328         spin_unlock(&osc_shrink_lock);
3329
3330         /* lru cleanup */
3331         if (cli->cl_cache != NULL) {
3332                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3333                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3334                 list_del_init(&cli->cl_lru_osc);
3335                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3336                 cli->cl_lru_left = NULL;
3337                 cl_cache_decref(cli->cl_cache);
3338                 cli->cl_cache = NULL;
3339         }
3340
3341         /* free memory of osc quota cache */
3342         osc_quota_cleanup(obd);
3343
3344         rc = client_obd_cleanup(obd);
3345
3346         ptlrpcd_decref();
3347         RETURN(rc);
3348 }
3349 EXPORT_SYMBOL(osc_cleanup_common);
3350
3351 static struct obd_ops osc_obd_ops = {
3352         .o_owner                = THIS_MODULE,
3353         .o_setup                = osc_setup,
3354         .o_precleanup           = osc_precleanup,
3355         .o_cleanup              = osc_cleanup_common,
3356         .o_add_conn             = client_import_add_conn,
3357         .o_del_conn             = client_import_del_conn,
3358         .o_connect              = client_connect_import,
3359         .o_reconnect            = osc_reconnect,
3360         .o_disconnect           = osc_disconnect,
3361         .o_statfs               = osc_statfs,
3362         .o_statfs_async         = osc_statfs_async,
3363         .o_create               = osc_create,
3364         .o_destroy              = osc_destroy,
3365         .o_getattr              = osc_getattr,
3366         .o_setattr              = osc_setattr,
3367         .o_iocontrol            = osc_iocontrol,
3368         .o_set_info_async       = osc_set_info_async,
3369         .o_import_event         = osc_import_event,
3370         .o_quotactl             = osc_quotactl,
3371 };
3372
3373 static struct shrinker *osc_cache_shrinker;
3374 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3375 DEFINE_SPINLOCK(osc_shrink_lock);
3376
3377 #ifndef HAVE_SHRINKER_COUNT
3378 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3379 {
3380         struct shrink_control scv = {
3381                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3382                 .gfp_mask   = shrink_param(sc, gfp_mask)
3383         };
3384 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3385         struct shrinker *shrinker = NULL;
3386 #endif
3387
3388         (void)osc_cache_shrink_scan(shrinker, &scv);
3389
3390         return osc_cache_shrink_count(shrinker, &scv);
3391 }
3392 #endif
3393
3394 static int __init osc_init(void)
3395 {
3396         unsigned int reqpool_size;
3397         unsigned int reqsize;
3398         int rc;
3399         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3400                          osc_cache_shrink_count, osc_cache_shrink_scan);
3401         ENTRY;
3402
3403         /* print an address of _any_ initialized kernel symbol from this
3404          * module, to allow debugging with gdb that doesn't support data
3405          * symbols from modules.*/
3406         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3407
3408         rc = lu_kmem_init(osc_caches);
3409         if (rc)
3410                 RETURN(rc);
3411
3412         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3413                                  LUSTRE_OSC_NAME, &osc_device_type);
3414         if (rc)
3415                 GOTO(out_kmem, rc);
3416
3417         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3418
3419         /* This is obviously too much memory, only prevent overflow here */
3420         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3421                 GOTO(out_type, rc = -EINVAL);
3422
3423         reqpool_size = osc_reqpool_mem_max << 20;
3424
3425         reqsize = 1;
3426         while (reqsize < OST_IO_MAXREQSIZE)
3427                 reqsize = reqsize << 1;
3428
3429         /*
3430          * We don't enlarge the request count in OSC pool according to
3431          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3432          * tried after normal allocation failed. So a small OSC pool won't
3433          * cause much performance degression in most of cases.
3434          */
3435         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3436
3437         atomic_set(&osc_pool_req_count, 0);
3438         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3439                                           ptlrpc_add_rqs_to_pool);
3440
3441         if (osc_rq_pool == NULL)
3442                 GOTO(out_type, rc = -ENOMEM);
3443
3444         rc = osc_start_grant_work();
3445         if (rc != 0)
3446                 GOTO(out_req_pool, rc);
3447
3448         RETURN(rc);
3449
3450 out_req_pool:
3451         ptlrpc_free_rq_pool(osc_rq_pool);
3452 out_type:
3453         class_unregister_type(LUSTRE_OSC_NAME);
3454 out_kmem:
3455         lu_kmem_fini(osc_caches);
3456
3457         RETURN(rc);
3458 }
3459
3460 static void __exit osc_exit(void)
3461 {
3462         osc_stop_grant_work();
3463         remove_shrinker(osc_cache_shrinker);
3464         class_unregister_type(LUSTRE_OSC_NAME);
3465         lu_kmem_fini(osc_caches);
3466         ptlrpc_free_rq_pool(osc_rq_pool);
3467 }
3468
3469 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3470 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3471 MODULE_VERSION(LUSTRE_VERSION_STRING);
3472 MODULE_LICENSE("GPL");
3473
3474 module_init(osc_init);
3475 module_exit(osc_exit);