Whamcloud - gitweb
LU-12477 lustre: remove obsolete config checks
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         LIST_HEAD(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 /*
617                  * Wait until the number of on-going destroy RPCs drops
618                  * under max_rpc_in_flight
619                  */
620                 rc = l_wait_event_abortable_exclusive(
621                         cli->cl_destroy_waitq,
622                         osc_can_send_destroy(cli));
623                 if (rc) {
624                         ptlrpc_req_finished(req);
625                         RETURN(-EINTR);
626                 }
627         }
628
629         /* Do not wait for response */
630         ptlrpcd_add_req(req);
631         RETURN(0);
632 }
633
634 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
635                                 long writing_bytes)
636 {
637         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
638
639         LASSERT(!(oa->o_valid & bits));
640
641         oa->o_valid |= bits;
642         spin_lock(&cli->cl_loi_list_lock);
643         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
644                 oa->o_dirty = cli->cl_dirty_grant;
645         else
646                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
647         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
648                 CERROR("dirty %lu > dirty_max %lu\n",
649                        cli->cl_dirty_pages,
650                        cli->cl_dirty_max_pages);
651                 oa->o_undirty = 0;
652         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
653                             (long)(obd_max_dirty_pages + 1))) {
654                 /* The atomic_read() allowing the atomic_inc() are
655                  * not covered by a lock thus they may safely race and trip
656                  * this CERROR() unless we add in a small fudge factor (+1). */
657                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
658                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
659                        obd_max_dirty_pages);
660                 oa->o_undirty = 0;
661         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
662                             0x7fffffff)) {
663                 CERROR("dirty %lu - dirty_max %lu too big???\n",
664                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
665                 oa->o_undirty = 0;
666         } else {
667                 unsigned long nrpages;
668                 unsigned long undirty;
669
670                 nrpages = cli->cl_max_pages_per_rpc;
671                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
672                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
673                 undirty = nrpages << PAGE_SHIFT;
674                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
675                                  GRANT_PARAM)) {
676                         int nrextents;
677
678                         /* take extent tax into account when asking for more
679                          * grant space */
680                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
681                                      cli->cl_max_extent_pages;
682                         undirty += nrextents * cli->cl_grant_extent_tax;
683                 }
684                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
685                  * to add extent tax, etc.
686                  */
687                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
688                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
689         }
690         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
691         oa->o_dropped = cli->cl_lost_grant;
692         cli->cl_lost_grant = 0;
693         spin_unlock(&cli->cl_loi_list_lock);
694         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
695                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
696 }
697
698 void osc_update_next_shrink(struct client_obd *cli)
699 {
700         cli->cl_next_shrink_grant = ktime_get_seconds() +
701                                     cli->cl_grant_shrink_interval;
702
703         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
704                cli->cl_next_shrink_grant);
705 }
706
707 static void __osc_update_grant(struct client_obd *cli, u64 grant)
708 {
709         spin_lock(&cli->cl_loi_list_lock);
710         cli->cl_avail_grant += grant;
711         spin_unlock(&cli->cl_loi_list_lock);
712 }
713
714 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
715 {
716         if (body->oa.o_valid & OBD_MD_FLGRANT) {
717                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
718                 __osc_update_grant(cli, body->oa.o_grant);
719         }
720 }
721
722 /**
723  * grant thread data for shrinking space.
724  */
725 struct grant_thread_data {
726         struct list_head        gtd_clients;
727         struct mutex            gtd_mutex;
728         unsigned long           gtd_stopped:1;
729 };
730 static struct grant_thread_data client_gtd;
731
732 static int osc_shrink_grant_interpret(const struct lu_env *env,
733                                       struct ptlrpc_request *req,
734                                       void *args, int rc)
735 {
736         struct osc_grant_args *aa = args;
737         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
738         struct ost_body *body;
739
740         if (rc != 0) {
741                 __osc_update_grant(cli, aa->aa_oa->o_grant);
742                 GOTO(out, rc);
743         }
744
745         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
746         LASSERT(body);
747         osc_update_grant(cli, body);
748 out:
749         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
750         aa->aa_oa = NULL;
751
752         return rc;
753 }
754
755 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
756 {
757         spin_lock(&cli->cl_loi_list_lock);
758         oa->o_grant = cli->cl_avail_grant / 4;
759         cli->cl_avail_grant -= oa->o_grant;
760         spin_unlock(&cli->cl_loi_list_lock);
761         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
762                 oa->o_valid |= OBD_MD_FLFLAGS;
763                 oa->o_flags = 0;
764         }
765         oa->o_flags |= OBD_FL_SHRINK_GRANT;
766         osc_update_next_shrink(cli);
767 }
768
769 /* Shrink the current grant, either from some large amount to enough for a
770  * full set of in-flight RPCs, or if we have already shrunk to that limit
771  * then to enough for a single RPC.  This avoids keeping more grant than
772  * needed, and avoids shrinking the grant piecemeal. */
773 static int osc_shrink_grant(struct client_obd *cli)
774 {
775         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
776                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
777
778         spin_lock(&cli->cl_loi_list_lock);
779         if (cli->cl_avail_grant <= target_bytes)
780                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
781         spin_unlock(&cli->cl_loi_list_lock);
782
783         return osc_shrink_grant_to_target(cli, target_bytes);
784 }
785
786 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
787 {
788         int                     rc = 0;
789         struct ost_body        *body;
790         ENTRY;
791
792         spin_lock(&cli->cl_loi_list_lock);
793         /* Don't shrink if we are already above or below the desired limit
794          * We don't want to shrink below a single RPC, as that will negatively
795          * impact block allocation and long-term performance. */
796         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
797                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
798
799         if (target_bytes >= cli->cl_avail_grant) {
800                 spin_unlock(&cli->cl_loi_list_lock);
801                 RETURN(0);
802         }
803         spin_unlock(&cli->cl_loi_list_lock);
804
805         OBD_ALLOC_PTR(body);
806         if (!body)
807                 RETURN(-ENOMEM);
808
809         osc_announce_cached(cli, &body->oa, 0);
810
811         spin_lock(&cli->cl_loi_list_lock);
812         if (target_bytes >= cli->cl_avail_grant) {
813                 /* available grant has changed since target calculation */
814                 spin_unlock(&cli->cl_loi_list_lock);
815                 GOTO(out_free, rc = 0);
816         }
817         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
818         cli->cl_avail_grant = target_bytes;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
821                 body->oa.o_valid |= OBD_MD_FLFLAGS;
822                 body->oa.o_flags = 0;
823         }
824         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826
827         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
828                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
829                                 sizeof(*body), body, NULL);
830         if (rc != 0)
831                 __osc_update_grant(cli, body->oa.o_grant);
832 out_free:
833         OBD_FREE_PTR(body);
834         RETURN(rc);
835 }
836
837 static int osc_should_shrink_grant(struct client_obd *client)
838 {
839         time64_t next_shrink = client->cl_next_shrink_grant;
840
841         if (client->cl_import == NULL)
842                 return 0;
843
844         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
845             client->cl_import->imp_grant_shrink_disabled) {
846                 osc_update_next_shrink(client);
847                 return 0;
848         }
849
850         if (ktime_get_seconds() >= next_shrink - 5) {
851                 /* Get the current RPC size directly, instead of going via:
852                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
853                  * Keep comment here so that it can be found by searching. */
854                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
855
856                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
857                     client->cl_avail_grant > brw_size)
858                         return 1;
859                 else
860                         osc_update_next_shrink(client);
861         }
862         return 0;
863 }
864
865 #define GRANT_SHRINK_RPC_BATCH  100
866
867 static struct delayed_work work;
868
869 static void osc_grant_work_handler(struct work_struct *data)
870 {
871         struct client_obd *cli;
872         int rpc_sent;
873         bool init_next_shrink = true;
874         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
875
876         rpc_sent = 0;
877         mutex_lock(&client_gtd.gtd_mutex);
878         list_for_each_entry(cli, &client_gtd.gtd_clients,
879                             cl_grant_chain) {
880                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
881                     osc_should_shrink_grant(cli)) {
882                         osc_shrink_grant(cli);
883                         rpc_sent++;
884                 }
885
886                 if (!init_next_shrink) {
887                         if (cli->cl_next_shrink_grant < next_shrink &&
888                             cli->cl_next_shrink_grant > ktime_get_seconds())
889                                 next_shrink = cli->cl_next_shrink_grant;
890                 } else {
891                         init_next_shrink = false;
892                         next_shrink = cli->cl_next_shrink_grant;
893                 }
894         }
895         mutex_unlock(&client_gtd.gtd_mutex);
896
897         if (client_gtd.gtd_stopped == 1)
898                 return;
899
900         if (next_shrink > ktime_get_seconds()) {
901                 time64_t delay = next_shrink - ktime_get_seconds();
902
903                 schedule_delayed_work(&work, cfs_time_seconds(delay));
904         } else {
905                 schedule_work(&work.work);
906         }
907 }
908
909 void osc_schedule_grant_work(void)
910 {
911         cancel_delayed_work_sync(&work);
912         schedule_work(&work.work);
913 }
914
915 /**
916  * Start grant thread for returing grant to server for idle clients.
917  */
918 static int osc_start_grant_work(void)
919 {
920         client_gtd.gtd_stopped = 0;
921         mutex_init(&client_gtd.gtd_mutex);
922         INIT_LIST_HEAD(&client_gtd.gtd_clients);
923
924         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
925         schedule_work(&work.work);
926
927         return 0;
928 }
929
930 static void osc_stop_grant_work(void)
931 {
932         client_gtd.gtd_stopped = 1;
933         cancel_delayed_work_sync(&work);
934 }
935
936 static void osc_add_grant_list(struct client_obd *client)
937 {
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
940         mutex_unlock(&client_gtd.gtd_mutex);
941 }
942
943 static void osc_del_grant_list(struct client_obd *client)
944 {
945         if (list_empty(&client->cl_grant_chain))
946                 return;
947
948         mutex_lock(&client_gtd.gtd_mutex);
949         list_del_init(&client->cl_grant_chain);
950         mutex_unlock(&client_gtd.gtd_mutex);
951 }
952
953 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
954 {
955         /*
956          * ocd_grant is the total grant amount we're expect to hold: if we've
957          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
958          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
959          * dirty.
960          *
961          * race is tolerable here: if we're evicted, but imp_state already
962          * left EVICTED state, then cl_dirty_pages must be 0 already.
963          */
964         spin_lock(&cli->cl_loi_list_lock);
965         cli->cl_avail_grant = ocd->ocd_grant;
966         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
967                 cli->cl_avail_grant -= cli->cl_reserved_grant;
968                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
969                         cli->cl_avail_grant -= cli->cl_dirty_grant;
970                 else
971                         cli->cl_avail_grant -=
972                                         cli->cl_dirty_pages << PAGE_SHIFT;
973         }
974
975         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
976                 u64 size;
977                 int chunk_mask;
978
979                 /* overhead for each extent insertion */
980                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
981                 /* determine the appropriate chunk size used by osc_extent. */
982                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
983                                           ocd->ocd_grant_blkbits);
984                 /* max_pages_per_rpc must be chunk aligned */
985                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
986                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
987                                              ~chunk_mask) & chunk_mask;
988                 /* determine maximum extent size, in #pages */
989                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
990                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
991                 if (cli->cl_max_extent_pages == 0)
992                         cli->cl_max_extent_pages = 1;
993         } else {
994                 cli->cl_grant_extent_tax = 0;
995                 cli->cl_chunkbits = PAGE_SHIFT;
996                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
997         }
998         spin_unlock(&cli->cl_loi_list_lock);
999
1000         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1001                 "chunk bits: %d cl_max_extent_pages: %d\n",
1002                 cli_name(cli),
1003                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1004                 cli->cl_max_extent_pages);
1005
1006         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1007                 osc_add_grant_list(cli);
1008 }
1009 EXPORT_SYMBOL(osc_init_grant);
1010
1011 /* We assume that the reason this OSC got a short read is because it read
1012  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1013  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1014  * this stripe never got written at or beyond this stripe offset yet. */
1015 static void handle_short_read(int nob_read, size_t page_count,
1016                               struct brw_page **pga)
1017 {
1018         char *ptr;
1019         int i = 0;
1020
1021         /* skip bytes read OK */
1022         while (nob_read > 0) {
1023                 LASSERT (page_count > 0);
1024
1025                 if (pga[i]->count > nob_read) {
1026                         /* EOF inside this page */
1027                         ptr = kmap(pga[i]->pg) +
1028                                 (pga[i]->off & ~PAGE_MASK);
1029                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1030                         kunmap(pga[i]->pg);
1031                         page_count--;
1032                         i++;
1033                         break;
1034                 }
1035
1036                 nob_read -= pga[i]->count;
1037                 page_count--;
1038                 i++;
1039         }
1040
1041         /* zero remaining pages */
1042         while (page_count-- > 0) {
1043                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1044                 memset(ptr, 0, pga[i]->count);
1045                 kunmap(pga[i]->pg);
1046                 i++;
1047         }
1048 }
1049
1050 static int check_write_rcs(struct ptlrpc_request *req,
1051                            int requested_nob, int niocount,
1052                            size_t page_count, struct brw_page **pga)
1053 {
1054         int     i;
1055         __u32   *remote_rcs;
1056
1057         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1058                                                   sizeof(*remote_rcs) *
1059                                                   niocount);
1060         if (remote_rcs == NULL) {
1061                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1062                 return(-EPROTO);
1063         }
1064
1065         /* return error if any niobuf was in error */
1066         for (i = 0; i < niocount; i++) {
1067                 if ((int)remote_rcs[i] < 0) {
1068                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1069                                i, remote_rcs[i], req);
1070                         return remote_rcs[i];
1071                 }
1072
1073                 if (remote_rcs[i] != 0) {
1074                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1075                                 i, remote_rcs[i], req);
1076                         return(-EPROTO);
1077                 }
1078         }
1079         if (req->rq_bulk != NULL &&
1080             req->rq_bulk->bd_nob_transferred != requested_nob) {
1081                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1082                        req->rq_bulk->bd_nob_transferred, requested_nob);
1083                 return(-EPROTO);
1084         }
1085
1086         return (0);
1087 }
1088
1089 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1090 {
1091         if (p1->flag != p2->flag) {
1092                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1093                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1094                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1095
1096                 /* warn if we try to combine flags that we don't know to be
1097                  * safe to combine */
1098                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1099                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1100                               "report this at https://jira.whamcloud.com/\n",
1101                               p1->flag, p2->flag);
1102                 }
1103                 return 0;
1104         }
1105
1106         return (p1->off + p1->count == p2->off);
1107 }
1108
1109 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1110 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1111                                    size_t pg_count, struct brw_page **pga,
1112                                    int opc, obd_dif_csum_fn *fn,
1113                                    int sector_size,
1114                                    u32 *check_sum)
1115 {
1116         struct ahash_request *req;
1117         /* Used Adler as the default checksum type on top of DIF tags */
1118         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1119         struct page *__page;
1120         unsigned char *buffer;
1121         __u16 *guard_start;
1122         unsigned int bufsize;
1123         int guard_number;
1124         int used_number = 0;
1125         int used;
1126         u32 cksum;
1127         int rc = 0;
1128         int i = 0;
1129
1130         LASSERT(pg_count > 0);
1131
1132         __page = alloc_page(GFP_KERNEL);
1133         if (__page == NULL)
1134                 return -ENOMEM;
1135
1136         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1137         if (IS_ERR(req)) {
1138                 rc = PTR_ERR(req);
1139                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1140                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1141                 GOTO(out, rc);
1142         }
1143
1144         buffer = kmap(__page);
1145         guard_start = (__u16 *)buffer;
1146         guard_number = PAGE_SIZE / sizeof(*guard_start);
1147         while (nob > 0 && pg_count > 0) {
1148                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1149
1150                 /* corrupt the data before we compute the checksum, to
1151                  * simulate an OST->client data error */
1152                 if (unlikely(i == 0 && opc == OST_READ &&
1153                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1154                         unsigned char *ptr = kmap(pga[i]->pg);
1155                         int off = pga[i]->off & ~PAGE_MASK;
1156
1157                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1158                         kunmap(pga[i]->pg);
1159                 }
1160
1161                 /*
1162                  * The left guard number should be able to hold checksums of a
1163                  * whole page
1164                  */
1165                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1166                                                   pga[i]->off & ~PAGE_MASK,
1167                                                   count,
1168                                                   guard_start + used_number,
1169                                                   guard_number - used_number,
1170                                                   &used, sector_size,
1171                                                   fn);
1172                 if (rc)
1173                         break;
1174
1175                 used_number += used;
1176                 if (used_number == guard_number) {
1177                         cfs_crypto_hash_update_page(req, __page, 0,
1178                                 used_number * sizeof(*guard_start));
1179                         used_number = 0;
1180                 }
1181
1182                 nob -= pga[i]->count;
1183                 pg_count--;
1184                 i++;
1185         }
1186         kunmap(__page);
1187         if (rc)
1188                 GOTO(out, rc);
1189
1190         if (used_number != 0)
1191                 cfs_crypto_hash_update_page(req, __page, 0,
1192                         used_number * sizeof(*guard_start));
1193
1194         bufsize = sizeof(cksum);
1195         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1196
1197         /* For sending we only compute the wrong checksum instead
1198          * of corrupting the data so it is still correct on a redo */
1199         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1200                 cksum++;
1201
1202         *check_sum = cksum;
1203 out:
1204         __free_page(__page);
1205         return rc;
1206 }
1207 #else /* !CONFIG_CRC_T10DIF */
1208 #define obd_dif_ip_fn NULL
1209 #define obd_dif_crc_fn NULL
1210 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1211         -EOPNOTSUPP
1212 #endif /* CONFIG_CRC_T10DIF */
1213
1214 static int osc_checksum_bulk(int nob, size_t pg_count,
1215                              struct brw_page **pga, int opc,
1216                              enum cksum_types cksum_type,
1217                              u32 *cksum)
1218 {
1219         int                             i = 0;
1220         struct ahash_request           *req;
1221         unsigned int                    bufsize;
1222         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1223
1224         LASSERT(pg_count > 0);
1225
1226         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1227         if (IS_ERR(req)) {
1228                 CERROR("Unable to initialize checksum hash %s\n",
1229                        cfs_crypto_hash_name(cfs_alg));
1230                 return PTR_ERR(req);
1231         }
1232
1233         while (nob > 0 && pg_count > 0) {
1234                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1235
1236                 /* corrupt the data before we compute the checksum, to
1237                  * simulate an OST->client data error */
1238                 if (i == 0 && opc == OST_READ &&
1239                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1240                         unsigned char *ptr = kmap(pga[i]->pg);
1241                         int off = pga[i]->off & ~PAGE_MASK;
1242
1243                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1244                         kunmap(pga[i]->pg);
1245                 }
1246                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1247                                             pga[i]->off & ~PAGE_MASK,
1248                                             count);
1249                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1250                                (int)(pga[i]->off & ~PAGE_MASK));
1251
1252                 nob -= pga[i]->count;
1253                 pg_count--;
1254                 i++;
1255         }
1256
1257         bufsize = sizeof(*cksum);
1258         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1259
1260         /* For sending we only compute the wrong checksum instead
1261          * of corrupting the data so it is still correct on a redo */
1262         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1263                 (*cksum)++;
1264
1265         return 0;
1266 }
1267
1268 static int osc_checksum_bulk_rw(const char *obd_name,
1269                                 enum cksum_types cksum_type,
1270                                 int nob, size_t pg_count,
1271                                 struct brw_page **pga, int opc,
1272                                 u32 *check_sum)
1273 {
1274         obd_dif_csum_fn *fn = NULL;
1275         int sector_size = 0;
1276         int rc;
1277
1278         ENTRY;
1279         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1280
1281         if (fn)
1282                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1283                                              opc, fn, sector_size, check_sum);
1284         else
1285                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1286                                        check_sum);
1287
1288         RETURN(rc);
1289 }
1290
1291 static int
1292 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1293                      u32 page_count, struct brw_page **pga,
1294                      struct ptlrpc_request **reqp, int resend)
1295 {
1296         struct ptlrpc_request   *req;
1297         struct ptlrpc_bulk_desc *desc;
1298         struct ost_body         *body;
1299         struct obd_ioobj        *ioobj;
1300         struct niobuf_remote    *niobuf;
1301         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1302         struct osc_brw_async_args *aa;
1303         struct req_capsule      *pill;
1304         struct brw_page *pg_prev;
1305         void *short_io_buf;
1306         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1307
1308         ENTRY;
1309         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1310                 RETURN(-ENOMEM); /* Recoverable */
1311         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1312                 RETURN(-EINVAL); /* Fatal */
1313
1314         if ((cmd & OBD_BRW_WRITE) != 0) {
1315                 opc = OST_WRITE;
1316                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1317                                                 osc_rq_pool,
1318                                                 &RQF_OST_BRW_WRITE);
1319         } else {
1320                 opc = OST_READ;
1321                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1322         }
1323         if (req == NULL)
1324                 RETURN(-ENOMEM);
1325
1326         for (niocount = i = 1; i < page_count; i++) {
1327                 if (!can_merge_pages(pga[i - 1], pga[i]))
1328                         niocount++;
1329         }
1330
1331         pill = &req->rq_pill;
1332         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1333                              sizeof(*ioobj));
1334         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1335                              niocount * sizeof(*niobuf));
1336
1337         for (i = 0; i < page_count; i++)
1338                 short_io_size += pga[i]->count;
1339
1340         /* Check if read/write is small enough to be a short io. */
1341         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1342             !imp_connect_shortio(cli->cl_import))
1343                 short_io_size = 0;
1344
1345         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1346                              opc == OST_READ ? 0 : short_io_size);
1347         if (opc == OST_READ)
1348                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1349                                      short_io_size);
1350
1351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1352         if (rc) {
1353                 ptlrpc_request_free(req);
1354                 RETURN(rc);
1355         }
1356         osc_set_io_portal(req);
1357
1358         ptlrpc_at_set_req_timeout(req);
1359         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1360          * retry logic */
1361         req->rq_no_retry_einprogress = 1;
1362
1363         if (short_io_size != 0) {
1364                 desc = NULL;
1365                 short_io_buf = NULL;
1366                 goto no_bulk;
1367         }
1368
1369         desc = ptlrpc_prep_bulk_imp(req, page_count,
1370                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1371                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1372                         PTLRPC_BULK_PUT_SINK) |
1373                         PTLRPC_BULK_BUF_KIOV,
1374                 OST_BULK_PORTAL,
1375                 &ptlrpc_bulk_kiov_pin_ops);
1376
1377         if (desc == NULL)
1378                 GOTO(out, rc = -ENOMEM);
1379         /* NB request now owns desc and will free it when it gets freed */
1380 no_bulk:
1381         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1382         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1383         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1384         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1385
1386         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1387
1388         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1389          * and from_kgid(), because they are asynchronous. Fortunately, variable
1390          * oa contains valid o_uid and o_gid in these two operations.
1391          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1392          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1393          * other process logic */
1394         body->oa.o_uid = oa->o_uid;
1395         body->oa.o_gid = oa->o_gid;
1396
1397         obdo_to_ioobj(oa, ioobj);
1398         ioobj->ioo_bufcnt = niocount;
1399         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1400          * that might be send for this request.  The actual number is decided
1401          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1402          * "max - 1" for old client compatibility sending "0", and also so the
1403          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1404         if (desc != NULL)
1405                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1406         else /* short io */
1407                 ioobj_max_brw_set(ioobj, 0);
1408
1409         if (short_io_size != 0) {
1410                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1411                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1412                         body->oa.o_flags = 0;
1413                 }
1414                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1415                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1416                        short_io_size);
1417                 if (opc == OST_WRITE) {
1418                         short_io_buf = req_capsule_client_get(pill,
1419                                                               &RMF_SHORT_IO);
1420                         LASSERT(short_io_buf != NULL);
1421                 }
1422         }
1423
1424         LASSERT(page_count > 0);
1425         pg_prev = pga[0];
1426         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1427                 struct brw_page *pg = pga[i];
1428                 int poff = pg->off & ~PAGE_MASK;
1429
1430                 LASSERT(pg->count > 0);
1431                 /* make sure there is no gap in the middle of page array */
1432                 LASSERTF(page_count == 1 ||
1433                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1434                           ergo(i > 0 && i < page_count - 1,
1435                                poff == 0 && pg->count == PAGE_SIZE)   &&
1436                           ergo(i == page_count - 1, poff == 0)),
1437                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1438                          i, page_count, pg, pg->off, pg->count);
1439                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1440                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1441                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1442                          i, page_count,
1443                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1444                          pg_prev->pg, page_private(pg_prev->pg),
1445                          pg_prev->pg->index, pg_prev->off);
1446                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1447                         (pg->flag & OBD_BRW_SRVLOCK));
1448                 if (short_io_size != 0 && opc == OST_WRITE) {
1449                         unsigned char *ptr = kmap_atomic(pg->pg);
1450
1451                         LASSERT(short_io_size >= requested_nob + pg->count);
1452                         memcpy(short_io_buf + requested_nob,
1453                                ptr + poff,
1454                                pg->count);
1455                         kunmap_atomic(ptr);
1456                 } else if (short_io_size == 0) {
1457                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1458                                                          pg->count);
1459                 }
1460                 requested_nob += pg->count;
1461
1462                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1463                         niobuf--;
1464                         niobuf->rnb_len += pg->count;
1465                 } else {
1466                         niobuf->rnb_offset = pg->off;
1467                         niobuf->rnb_len    = pg->count;
1468                         niobuf->rnb_flags  = pg->flag;
1469                 }
1470                 pg_prev = pg;
1471         }
1472
1473         LASSERTF((void *)(niobuf - niocount) ==
1474                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1475                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1476                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1477
1478         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1479         if (resend) {
1480                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1481                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1482                         body->oa.o_flags = 0;
1483                 }
1484                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1485         }
1486
1487         if (osc_should_shrink_grant(cli))
1488                 osc_shrink_grant_local(cli, &body->oa);
1489
1490         /* size[REQ_REC_OFF] still sizeof (*body) */
1491         if (opc == OST_WRITE) {
1492                 if (cli->cl_checksum &&
1493                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1494                         /* store cl_cksum_type in a local variable since
1495                          * it can be changed via lprocfs */
1496                         enum cksum_types cksum_type = cli->cl_cksum_type;
1497
1498                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1499                                 body->oa.o_flags = 0;
1500
1501                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1502                                                                 cksum_type);
1503                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1504
1505                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1506                                                   requested_nob, page_count,
1507                                                   pga, OST_WRITE,
1508                                                   &body->oa.o_cksum);
1509                         if (rc < 0) {
1510                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1511                                        rc);
1512                                 GOTO(out, rc);
1513                         }
1514                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1515                                body->oa.o_cksum);
1516
1517                         /* save this in 'oa', too, for later checking */
1518                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1519                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1520                                                            cksum_type);
1521                 } else {
1522                         /* clear out the checksum flag, in case this is a
1523                          * resend but cl_checksum is no longer set. b=11238 */
1524                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1525                 }
1526                 oa->o_cksum = body->oa.o_cksum;
1527                 /* 1 RC per niobuf */
1528                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1529                                      sizeof(__u32) * niocount);
1530         } else {
1531                 if (cli->cl_checksum &&
1532                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1533                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1534                                 body->oa.o_flags = 0;
1535                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1536                                 cli->cl_cksum_type);
1537                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1538                 }
1539
1540                 /* Client cksum has been already copied to wire obdo in previous
1541                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1542                  * resent due to cksum error, this will allow Server to
1543                  * check+dump pages on its side */
1544         }
1545         ptlrpc_request_set_replen(req);
1546
1547         aa = ptlrpc_req_async_args(aa, req);
1548         aa->aa_oa = oa;
1549         aa->aa_requested_nob = requested_nob;
1550         aa->aa_nio_count = niocount;
1551         aa->aa_page_count = page_count;
1552         aa->aa_resends = 0;
1553         aa->aa_ppga = pga;
1554         aa->aa_cli = cli;
1555         INIT_LIST_HEAD(&aa->aa_oaps);
1556
1557         *reqp = req;
1558         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1559         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1560                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1561                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1562         RETURN(0);
1563
1564  out:
1565         ptlrpc_req_finished(req);
1566         RETURN(rc);
1567 }
1568
1569 char dbgcksum_file_name[PATH_MAX];
1570
1571 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1572                                 struct brw_page **pga, __u32 server_cksum,
1573                                 __u32 client_cksum)
1574 {
1575         struct file *filp;
1576         int rc, i;
1577         unsigned int len;
1578         char *buf;
1579
1580         /* will only keep dump of pages on first error for the same range in
1581          * file/fid, not during the resends/retries. */
1582         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1583                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1584                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1585                   libcfs_debug_file_path_arr :
1586                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1587                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1590                  pga[0]->off,
1591                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1592                  client_cksum, server_cksum);
1593         filp = filp_open(dbgcksum_file_name,
1594                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1595         if (IS_ERR(filp)) {
1596                 rc = PTR_ERR(filp);
1597                 if (rc == -EEXIST)
1598                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1599                                "checksum error: rc = %d\n", dbgcksum_file_name,
1600                                rc);
1601                 else
1602                         CERROR("%s: can't open to dump pages with checksum "
1603                                "error: rc = %d\n", dbgcksum_file_name, rc);
1604                 return;
1605         }
1606
1607         for (i = 0; i < page_count; i++) {
1608                 len = pga[i]->count;
1609                 buf = kmap(pga[i]->pg);
1610                 while (len != 0) {
1611                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1612                         if (rc < 0) {
1613                                 CERROR("%s: wanted to write %u but got %d "
1614                                        "error\n", dbgcksum_file_name, len, rc);
1615                                 break;
1616                         }
1617                         len -= rc;
1618                         buf += rc;
1619                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1620                                dbgcksum_file_name, rc);
1621                 }
1622                 kunmap(pga[i]->pg);
1623         }
1624
1625         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1626         if (rc)
1627                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1628         filp_close(filp, NULL);
1629 }
1630
1631 static int
1632 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1633                      __u32 client_cksum, __u32 server_cksum,
1634                      struct osc_brw_async_args *aa)
1635 {
1636         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1637         enum cksum_types cksum_type;
1638         obd_dif_csum_fn *fn = NULL;
1639         int sector_size = 0;
1640         __u32 new_cksum;
1641         char *msg;
1642         int rc;
1643
1644         if (server_cksum == client_cksum) {
1645                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1646                 return 0;
1647         }
1648
1649         if (aa->aa_cli->cl_checksum_dump)
1650                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1651                                     server_cksum, client_cksum);
1652
1653         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1654                                            oa->o_flags : 0);
1655
1656         switch (cksum_type) {
1657         case OBD_CKSUM_T10IP512:
1658                 fn = obd_dif_ip_fn;
1659                 sector_size = 512;
1660                 break;
1661         case OBD_CKSUM_T10IP4K:
1662                 fn = obd_dif_ip_fn;
1663                 sector_size = 4096;
1664                 break;
1665         case OBD_CKSUM_T10CRC512:
1666                 fn = obd_dif_crc_fn;
1667                 sector_size = 512;
1668                 break;
1669         case OBD_CKSUM_T10CRC4K:
1670                 fn = obd_dif_crc_fn;
1671                 sector_size = 4096;
1672                 break;
1673         default:
1674                 break;
1675         }
1676
1677         if (fn)
1678                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1679                                              aa->aa_page_count, aa->aa_ppga,
1680                                              OST_WRITE, fn, sector_size,
1681                                              &new_cksum);
1682         else
1683                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1684                                        aa->aa_ppga, OST_WRITE, cksum_type,
1685                                        &new_cksum);
1686
1687         if (rc < 0)
1688                 msg = "failed to calculate the client write checksum";
1689         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1690                 msg = "the server did not use the checksum type specified in "
1691                       "the original request - likely a protocol problem";
1692         else if (new_cksum == server_cksum)
1693                 msg = "changed on the client after we checksummed it - "
1694                       "likely false positive due to mmap IO (bug 11742)";
1695         else if (new_cksum == client_cksum)
1696                 msg = "changed in transit before arrival at OST";
1697         else
1698                 msg = "changed in transit AND doesn't match the original - "
1699                       "likely false positive due to mmap IO (bug 11742)";
1700
1701         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1702                            DFID " object "DOSTID" extent [%llu-%llu], original "
1703                            "client csum %x (type %x), server csum %x (type %x),"
1704                            " client csum now %x\n",
1705                            obd_name, msg, libcfs_nid2str(peer->nid),
1706                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1707                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1709                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1710                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1711                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1712                            client_cksum,
1713                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1714                            server_cksum, cksum_type, new_cksum);
1715         return 1;
1716 }
1717
1718 /* Note rc enters this function as number of bytes transferred */
1719 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1720 {
1721         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1722         struct client_obd *cli = aa->aa_cli;
1723         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1724         const struct lnet_process_id *peer =
1725                 &req->rq_import->imp_connection->c_peer;
1726         struct ost_body *body;
1727         u32 client_cksum = 0;
1728
1729         ENTRY;
1730
1731         if (rc < 0 && rc != -EDQUOT) {
1732                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1733                 RETURN(rc);
1734         }
1735
1736         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1738         if (body == NULL) {
1739                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1740                 RETURN(-EPROTO);
1741         }
1742
1743         /* set/clear over quota flag for a uid/gid/projid */
1744         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1745             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1746                 unsigned qid[LL_MAXQUOTAS] = {
1747                                          body->oa.o_uid, body->oa.o_gid,
1748                                          body->oa.o_projid };
1749                 CDEBUG(D_QUOTA,
1750                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1751                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1752                        body->oa.o_valid, body->oa.o_flags);
1753                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1754                                        body->oa.o_flags);
1755         }
1756
1757         osc_update_grant(cli, body);
1758
1759         if (rc < 0)
1760                 RETURN(rc);
1761
1762         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1763                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1764
1765         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1766                 if (rc > 0) {
1767                         CERROR("%s: unexpected positive size %d\n",
1768                                obd_name, rc);
1769                         RETURN(-EPROTO);
1770                 }
1771
1772                 if (req->rq_bulk != NULL &&
1773                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1774                         RETURN(-EAGAIN);
1775
1776                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1777                     check_write_checksum(&body->oa, peer, client_cksum,
1778                                          body->oa.o_cksum, aa))
1779                         RETURN(-EAGAIN);
1780
1781                 rc = check_write_rcs(req, aa->aa_requested_nob,
1782                                      aa->aa_nio_count, aa->aa_page_count,
1783                                      aa->aa_ppga);
1784                 GOTO(out, rc);
1785         }
1786
1787         /* The rest of this function executes only for OST_READs */
1788
1789         if (req->rq_bulk == NULL) {
1790                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1791                                           RCL_SERVER);
1792                 LASSERT(rc == req->rq_status);
1793         } else {
1794                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1795                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1796         }
1797         if (rc < 0)
1798                 GOTO(out, rc = -EAGAIN);
1799
1800         if (rc > aa->aa_requested_nob) {
1801                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1802                        rc, aa->aa_requested_nob);
1803                 RETURN(-EPROTO);
1804         }
1805
1806         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1807                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1808                        rc, req->rq_bulk->bd_nob_transferred);
1809                 RETURN(-EPROTO);
1810         }
1811
1812         if (req->rq_bulk == NULL) {
1813                 /* short io */
1814                 int nob, pg_count, i = 0;
1815                 unsigned char *buf;
1816
1817                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1818                 pg_count = aa->aa_page_count;
1819                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1820                                                    rc);
1821                 nob = rc;
1822                 while (nob > 0 && pg_count > 0) {
1823                         unsigned char *ptr;
1824                         int count = aa->aa_ppga[i]->count > nob ?
1825                                     nob : aa->aa_ppga[i]->count;
1826
1827                         CDEBUG(D_CACHE, "page %p count %d\n",
1828                                aa->aa_ppga[i]->pg, count);
1829                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1830                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1831                                count);
1832                         kunmap_atomic((void *) ptr);
1833
1834                         buf += count;
1835                         nob -= count;
1836                         i++;
1837                         pg_count--;
1838                 }
1839         }
1840
1841         if (rc < aa->aa_requested_nob)
1842                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1843
1844         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1845                 static int cksum_counter;
1846                 u32        server_cksum = body->oa.o_cksum;
1847                 char      *via = "";
1848                 char      *router = "";
1849                 enum cksum_types cksum_type;
1850                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1851                         body->oa.o_flags : 0;
1852
1853                 cksum_type = obd_cksum_type_unpack(o_flags);
1854                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1855                                           aa->aa_page_count, aa->aa_ppga,
1856                                           OST_READ, &client_cksum);
1857                 if (rc < 0)
1858                         GOTO(out, rc);
1859
1860                 if (req->rq_bulk != NULL &&
1861                     peer->nid != req->rq_bulk->bd_sender) {
1862                         via = " via ";
1863                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1864                 }
1865
1866                 if (server_cksum != client_cksum) {
1867                         struct ost_body *clbody;
1868                         u32 page_count = aa->aa_page_count;
1869
1870                         clbody = req_capsule_client_get(&req->rq_pill,
1871                                                         &RMF_OST_BODY);
1872                         if (cli->cl_checksum_dump)
1873                                 dump_all_bulk_pages(&clbody->oa, page_count,
1874                                                     aa->aa_ppga, server_cksum,
1875                                                     client_cksum);
1876
1877                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1878                                            "%s%s%s inode "DFID" object "DOSTID
1879                                            " extent [%llu-%llu], client %x, "
1880                                            "server %x, cksum_type %x\n",
1881                                            obd_name,
1882                                            libcfs_nid2str(peer->nid),
1883                                            via, router,
1884                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1885                                                 clbody->oa.o_parent_seq : 0ULL,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_oid : 0,
1888                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1889                                                 clbody->oa.o_parent_ver : 0,
1890                                            POSTID(&body->oa.o_oi),
1891                                            aa->aa_ppga[0]->off,
1892                                            aa->aa_ppga[page_count-1]->off +
1893                                            aa->aa_ppga[page_count-1]->count - 1,
1894                                            client_cksum, server_cksum,
1895                                            cksum_type);
1896                         cksum_counter = 0;
1897                         aa->aa_oa->o_cksum = client_cksum;
1898                         rc = -EAGAIN;
1899                 } else {
1900                         cksum_counter++;
1901                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1902                         rc = 0;
1903                 }
1904         } else if (unlikely(client_cksum)) {
1905                 static int cksum_missed;
1906
1907                 cksum_missed++;
1908                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1909                         CERROR("%s: checksum %u requested from %s but not sent\n",
1910                                obd_name, cksum_missed,
1911                                libcfs_nid2str(peer->nid));
1912         } else {
1913                 rc = 0;
1914         }
1915 out:
1916         if (rc >= 0)
1917                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1918                                      aa->aa_oa, &body->oa);
1919
1920         RETURN(rc);
1921 }
1922
1923 static int osc_brw_redo_request(struct ptlrpc_request *request,
1924                                 struct osc_brw_async_args *aa, int rc)
1925 {
1926         struct ptlrpc_request *new_req;
1927         struct osc_brw_async_args *new_aa;
1928         struct osc_async_page *oap;
1929         ENTRY;
1930
1931         /* The below message is checked in replay-ost-single.sh test_8ae*/
1932         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1933                   "redo for recoverable error %d", rc);
1934
1935         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1936                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1937                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1938                                   aa->aa_ppga, &new_req, 1);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1943                 if (oap->oap_request != NULL) {
1944                         LASSERTF(request == oap->oap_request,
1945                                  "request %p != oap_request %p\n",
1946                                  request, oap->oap_request);
1947                         if (oap->oap_interrupted) {
1948                                 ptlrpc_req_finished(new_req);
1949                                 RETURN(-EINTR);
1950                         }
1951                 }
1952         }
1953         /*
1954          * New request takes over pga and oaps from old request.
1955          * Note that copying a list_head doesn't work, need to move it...
1956          */
1957         aa->aa_resends++;
1958         new_req->rq_interpret_reply = request->rq_interpret_reply;
1959         new_req->rq_async_args = request->rq_async_args;
1960         new_req->rq_commit_cb = request->rq_commit_cb;
1961         /* cap resend delay to the current request timeout, this is similar to
1962          * what ptlrpc does (see after_reply()) */
1963         if (aa->aa_resends > new_req->rq_timeout)
1964                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1965         else
1966                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1967         new_req->rq_generation_set = 1;
1968         new_req->rq_import_generation = request->rq_import_generation;
1969
1970         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1971
1972         INIT_LIST_HEAD(&new_aa->aa_oaps);
1973         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1974         INIT_LIST_HEAD(&new_aa->aa_exts);
1975         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1976         new_aa->aa_resends = aa->aa_resends;
1977
1978         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1979                 if (oap->oap_request) {
1980                         ptlrpc_req_finished(oap->oap_request);
1981                         oap->oap_request = ptlrpc_request_addref(new_req);
1982                 }
1983         }
1984
1985         /* XXX: This code will run into problem if we're going to support
1986          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1987          * and wait for all of them to be finished. We should inherit request
1988          * set from old request. */
1989         ptlrpcd_add_req(new_req);
1990
1991         DEBUG_REQ(D_INFO, new_req, "new request");
1992         RETURN(0);
1993 }
1994
1995 /*
1996  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1997  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1998  * fine for our small page arrays and doesn't require allocation.  its an
1999  * insertion sort that swaps elements that are strides apart, shrinking the
2000  * stride down until its '1' and the array is sorted.
2001  */
2002 static void sort_brw_pages(struct brw_page **array, int num)
2003 {
2004         int stride, i, j;
2005         struct brw_page *tmp;
2006
2007         if (num == 1)
2008                 return;
2009         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2010                 ;
2011
2012         do {
2013                 stride /= 3;
2014                 for (i = stride ; i < num ; i++) {
2015                         tmp = array[i];
2016                         j = i;
2017                         while (j >= stride && array[j - stride]->off > tmp->off) {
2018                                 array[j] = array[j - stride];
2019                                 j -= stride;
2020                         }
2021                         array[j] = tmp;
2022                 }
2023         } while (stride > 1);
2024 }
2025
2026 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2027 {
2028         LASSERT(ppga != NULL);
2029         OBD_FREE(ppga, sizeof(*ppga) * count);
2030 }
2031
2032 static int brw_interpret(const struct lu_env *env,
2033                          struct ptlrpc_request *req, void *args, int rc)
2034 {
2035         struct osc_brw_async_args *aa = args;
2036         struct osc_extent *ext;
2037         struct osc_extent *tmp;
2038         struct client_obd *cli = aa->aa_cli;
2039         unsigned long transferred = 0;
2040
2041         ENTRY;
2042
2043         rc = osc_brw_fini_request(req, rc);
2044         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2045         /*
2046          * When server returns -EINPROGRESS, client should always retry
2047          * regardless of the number of times the bulk was resent already.
2048          */
2049         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2050                 if (req->rq_import_generation !=
2051                     req->rq_import->imp_generation) {
2052                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2053                                ""DOSTID", rc = %d.\n",
2054                                req->rq_import->imp_obd->obd_name,
2055                                POSTID(&aa->aa_oa->o_oi), rc);
2056                 } else if (rc == -EINPROGRESS ||
2057                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2058                         rc = osc_brw_redo_request(req, aa, rc);
2059                 } else {
2060                         CERROR("%s: too many resent retries for object: "
2061                                "%llu:%llu, rc = %d.\n",
2062                                req->rq_import->imp_obd->obd_name,
2063                                POSTID(&aa->aa_oa->o_oi), rc);
2064                 }
2065
2066                 if (rc == 0)
2067                         RETURN(0);
2068                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2069                         rc = -EIO;
2070         }
2071
2072         if (rc == 0) {
2073                 struct obdo *oa = aa->aa_oa;
2074                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2075                 unsigned long valid = 0;
2076                 struct cl_object *obj;
2077                 struct osc_async_page *last;
2078
2079                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2080                 obj = osc2cl(last->oap_obj);
2081
2082                 cl_object_attr_lock(obj);
2083                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2084                         attr->cat_blocks = oa->o_blocks;
2085                         valid |= CAT_BLOCKS;
2086                 }
2087                 if (oa->o_valid & OBD_MD_FLMTIME) {
2088                         attr->cat_mtime = oa->o_mtime;
2089                         valid |= CAT_MTIME;
2090                 }
2091                 if (oa->o_valid & OBD_MD_FLATIME) {
2092                         attr->cat_atime = oa->o_atime;
2093                         valid |= CAT_ATIME;
2094                 }
2095                 if (oa->o_valid & OBD_MD_FLCTIME) {
2096                         attr->cat_ctime = oa->o_ctime;
2097                         valid |= CAT_CTIME;
2098                 }
2099
2100                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2101                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2102                         loff_t last_off = last->oap_count + last->oap_obj_off +
2103                                 last->oap_page_off;
2104
2105                         /* Change file size if this is an out of quota or
2106                          * direct IO write and it extends the file size */
2107                         if (loi->loi_lvb.lvb_size < last_off) {
2108                                 attr->cat_size = last_off;
2109                                 valid |= CAT_SIZE;
2110                         }
2111                         /* Extend KMS if it's not a lockless write */
2112                         if (loi->loi_kms < last_off &&
2113                             oap2osc_page(last)->ops_srvlock == 0) {
2114                                 attr->cat_kms = last_off;
2115                                 valid |= CAT_KMS;
2116                         }
2117                 }
2118
2119                 if (valid != 0)
2120                         cl_object_attr_update(env, obj, attr, valid);
2121                 cl_object_attr_unlock(obj);
2122         }
2123         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2124         aa->aa_oa = NULL;
2125
2126         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2127                 osc_inc_unstable_pages(req);
2128
2129         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2130                 list_del_init(&ext->oe_link);
2131                 osc_extent_finish(env, ext, 1,
2132                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2133         }
2134         LASSERT(list_empty(&aa->aa_exts));
2135         LASSERT(list_empty(&aa->aa_oaps));
2136
2137         transferred = (req->rq_bulk == NULL ? /* short io */
2138                        aa->aa_requested_nob :
2139                        req->rq_bulk->bd_nob_transferred);
2140
2141         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2142         ptlrpc_lprocfs_brw(req, transferred);
2143
2144         spin_lock(&cli->cl_loi_list_lock);
2145         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2146          * is called so we know whether to go to sync BRWs or wait for more
2147          * RPCs to complete */
2148         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2149                 cli->cl_w_in_flight--;
2150         else
2151                 cli->cl_r_in_flight--;
2152         osc_wake_cache_waiters(cli);
2153         spin_unlock(&cli->cl_loi_list_lock);
2154
2155         osc_io_unplug(env, cli, NULL);
2156         RETURN(rc);
2157 }
2158
2159 static void brw_commit(struct ptlrpc_request *req)
2160 {
2161         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2162          * this called via the rq_commit_cb, I need to ensure
2163          * osc_dec_unstable_pages is still called. Otherwise unstable
2164          * pages may be leaked. */
2165         spin_lock(&req->rq_lock);
2166         if (likely(req->rq_unstable)) {
2167                 req->rq_unstable = 0;
2168                 spin_unlock(&req->rq_lock);
2169
2170                 osc_dec_unstable_pages(req);
2171         } else {
2172                 req->rq_committed = 1;
2173                 spin_unlock(&req->rq_lock);
2174         }
2175 }
2176
2177 /**
2178  * Build an RPC by the list of extent @ext_list. The caller must ensure
2179  * that the total pages in this list are NOT over max pages per RPC.
2180  * Extents in the list must be in OES_RPC state.
2181  */
2182 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2183                   struct list_head *ext_list, int cmd)
2184 {
2185         struct ptlrpc_request           *req = NULL;
2186         struct osc_extent               *ext;
2187         struct brw_page                 **pga = NULL;
2188         struct osc_brw_async_args       *aa = NULL;
2189         struct obdo                     *oa = NULL;
2190         struct osc_async_page           *oap;
2191         struct osc_object               *obj = NULL;
2192         struct cl_req_attr              *crattr = NULL;
2193         loff_t                          starting_offset = OBD_OBJECT_EOF;
2194         loff_t                          ending_offset = 0;
2195         int                             mpflag = 0;
2196         int                             mem_tight = 0;
2197         int                             page_count = 0;
2198         bool                            soft_sync = false;
2199         bool                            interrupted = false;
2200         bool                            ndelay = false;
2201         int                             i;
2202         int                             grant = 0;
2203         int                             rc;
2204         __u32                           layout_version = 0;
2205         LIST_HEAD(rpc_list);
2206         struct ost_body                 *body;
2207         ENTRY;
2208         LASSERT(!list_empty(ext_list));
2209
2210         /* add pages into rpc_list to build BRW rpc */
2211         list_for_each_entry(ext, ext_list, oe_link) {
2212                 LASSERT(ext->oe_state == OES_RPC);
2213                 mem_tight |= ext->oe_memalloc;
2214                 grant += ext->oe_grants;
2215                 page_count += ext->oe_nr_pages;
2216                 layout_version = max(layout_version, ext->oe_layout_version);
2217                 if (obj == NULL)
2218                         obj = ext->oe_obj;
2219         }
2220
2221         soft_sync = osc_over_unstable_soft_limit(cli);
2222         if (mem_tight)
2223                 mpflag = cfs_memory_pressure_get_and_set();
2224
2225         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2226         if (pga == NULL)
2227                 GOTO(out, rc = -ENOMEM);
2228
2229         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2230         if (oa == NULL)
2231                 GOTO(out, rc = -ENOMEM);
2232
2233         i = 0;
2234         list_for_each_entry(ext, ext_list, oe_link) {
2235                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2236                         if (mem_tight)
2237                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2238                         if (soft_sync)
2239                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2240                         pga[i] = &oap->oap_brw_page;
2241                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2242                         i++;
2243
2244                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2245                         if (starting_offset == OBD_OBJECT_EOF ||
2246                             starting_offset > oap->oap_obj_off)
2247                                 starting_offset = oap->oap_obj_off;
2248                         else
2249                                 LASSERT(oap->oap_page_off == 0);
2250                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2251                                 ending_offset = oap->oap_obj_off +
2252                                                 oap->oap_count;
2253                         else
2254                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2255                                         PAGE_SIZE);
2256                         if (oap->oap_interrupted)
2257                                 interrupted = true;
2258                 }
2259                 if (ext->oe_ndelay)
2260                         ndelay = true;
2261         }
2262
2263         /* first page in the list */
2264         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2265
2266         crattr = &osc_env_info(env)->oti_req_attr;
2267         memset(crattr, 0, sizeof(*crattr));
2268         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2269         crattr->cra_flags = ~0ULL;
2270         crattr->cra_page = oap2cl_page(oap);
2271         crattr->cra_oa = oa;
2272         cl_req_attr_set(env, osc2cl(obj), crattr);
2273
2274         if (cmd == OBD_BRW_WRITE) {
2275                 oa->o_grant_used = grant;
2276                 if (layout_version > 0) {
2277                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2278                                PFID(&oa->o_oi.oi_fid), layout_version);
2279
2280                         oa->o_layout_version = layout_version;
2281                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2282                 }
2283         }
2284
2285         sort_brw_pages(pga, page_count);
2286         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2287         if (rc != 0) {
2288                 CERROR("prep_req failed: %d\n", rc);
2289                 GOTO(out, rc);
2290         }
2291
2292         req->rq_commit_cb = brw_commit;
2293         req->rq_interpret_reply = brw_interpret;
2294         req->rq_memalloc = mem_tight != 0;
2295         oap->oap_request = ptlrpc_request_addref(req);
2296         if (interrupted && !req->rq_intr)
2297                 ptlrpc_mark_interrupted(req);
2298         if (ndelay) {
2299                 req->rq_no_resend = req->rq_no_delay = 1;
2300                 /* probably set a shorter timeout value.
2301                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2302                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2303         }
2304
2305         /* Need to update the timestamps after the request is built in case
2306          * we race with setattr (locally or in queue at OST).  If OST gets
2307          * later setattr before earlier BRW (as determined by the request xid),
2308          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2309          * way to do this in a single call.  bug 10150 */
2310         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2311         crattr->cra_oa = &body->oa;
2312         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2313         cl_req_attr_set(env, osc2cl(obj), crattr);
2314         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2315
2316         aa = ptlrpc_req_async_args(aa, req);
2317         INIT_LIST_HEAD(&aa->aa_oaps);
2318         list_splice_init(&rpc_list, &aa->aa_oaps);
2319         INIT_LIST_HEAD(&aa->aa_exts);
2320         list_splice_init(ext_list, &aa->aa_exts);
2321
2322         spin_lock(&cli->cl_loi_list_lock);
2323         starting_offset >>= PAGE_SHIFT;
2324         if (cmd == OBD_BRW_READ) {
2325                 cli->cl_r_in_flight++;
2326                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329                                       starting_offset + 1);
2330         } else {
2331                 cli->cl_w_in_flight++;
2332                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2333                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2334                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335                                       starting_offset + 1);
2336         }
2337         spin_unlock(&cli->cl_loi_list_lock);
2338
2339         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2340                   page_count, aa, cli->cl_r_in_flight,
2341                   cli->cl_w_in_flight);
2342         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2343
2344         ptlrpcd_add_req(req);
2345         rc = 0;
2346         EXIT;
2347
2348 out:
2349         if (mem_tight != 0)
2350                 cfs_memory_pressure_restore(mpflag);
2351
2352         if (rc != 0) {
2353                 LASSERT(req == NULL);
2354
2355                 if (oa)
2356                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2357                 if (pga)
2358                         OBD_FREE(pga, sizeof(*pga) * page_count);
2359                 /* this should happen rarely and is pretty bad, it makes the
2360                  * pending list not follow the dirty order */
2361                 while (!list_empty(ext_list)) {
2362                         ext = list_entry(ext_list->next, struct osc_extent,
2363                                          oe_link);
2364                         list_del_init(&ext->oe_link);
2365                         osc_extent_finish(env, ext, 0, rc);
2366                 }
2367         }
2368         RETURN(rc);
2369 }
2370
2371 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2372 {
2373         int set = 0;
2374
2375         LASSERT(lock != NULL);
2376
2377         lock_res_and_lock(lock);
2378
2379         if (lock->l_ast_data == NULL)
2380                 lock->l_ast_data = data;
2381         if (lock->l_ast_data == data)
2382                 set = 1;
2383
2384         unlock_res_and_lock(lock);
2385
2386         return set;
2387 }
2388
2389 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2390                      void *cookie, struct lustre_handle *lockh,
2391                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2392                      int errcode)
2393 {
2394         bool intent = *flags & LDLM_FL_HAS_INTENT;
2395         int rc;
2396         ENTRY;
2397
2398         /* The request was created before ldlm_cli_enqueue call. */
2399         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2400                 struct ldlm_reply *rep;
2401
2402                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2403                 LASSERT(rep != NULL);
2404
2405                 rep->lock_policy_res1 =
2406                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2407                 if (rep->lock_policy_res1)
2408                         errcode = rep->lock_policy_res1;
2409                 if (!speculative)
2410                         *flags |= LDLM_FL_LVB_READY;
2411         } else if (errcode == ELDLM_OK) {
2412                 *flags |= LDLM_FL_LVB_READY;
2413         }
2414
2415         /* Call the update callback. */
2416         rc = (*upcall)(cookie, lockh, errcode);
2417
2418         /* release the reference taken in ldlm_cli_enqueue() */
2419         if (errcode == ELDLM_LOCK_MATCHED)
2420                 errcode = ELDLM_OK;
2421         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2422                 ldlm_lock_decref(lockh, mode);
2423
2424         RETURN(rc);
2425 }
2426
2427 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2428                           void *args, int rc)
2429 {
2430         struct osc_enqueue_args *aa = args;
2431         struct ldlm_lock *lock;
2432         struct lustre_handle *lockh = &aa->oa_lockh;
2433         enum ldlm_mode mode = aa->oa_mode;
2434         struct ost_lvb *lvb = aa->oa_lvb;
2435         __u32 lvb_len = sizeof(*lvb);
2436         __u64 flags = 0;
2437
2438         ENTRY;
2439
2440         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2441          * be valid. */
2442         lock = ldlm_handle2lock(lockh);
2443         LASSERTF(lock != NULL,
2444                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2445                  lockh->cookie, req, aa);
2446
2447         /* Take an additional reference so that a blocking AST that
2448          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2449          * to arrive after an upcall has been executed by
2450          * osc_enqueue_fini(). */
2451         ldlm_lock_addref(lockh, mode);
2452
2453         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2454         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2455
2456         /* Let CP AST to grant the lock first. */
2457         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2458
2459         if (aa->oa_speculative) {
2460                 LASSERT(aa->oa_lvb == NULL);
2461                 LASSERT(aa->oa_flags == NULL);
2462                 aa->oa_flags = &flags;
2463         }
2464
2465         /* Complete obtaining the lock procedure. */
2466         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2467                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2468                                    lockh, rc);
2469         /* Complete osc stuff. */
2470         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2471                               aa->oa_flags, aa->oa_speculative, rc);
2472
2473         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2474
2475         ldlm_lock_decref(lockh, mode);
2476         LDLM_LOCK_PUT(lock);
2477         RETURN(rc);
2478 }
2479
2480 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2481
2482 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2483  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2484  * other synchronous requests, however keeping some locks and trying to obtain
2485  * others may take a considerable amount of time in a case of ost failure; and
2486  * when other sync requests do not get released lock from a client, the client
2487  * is evicted from the cluster -- such scenarious make the life difficult, so
2488  * release locks just after they are obtained. */
2489 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2490                      __u64 *flags, union ldlm_policy_data *policy,
2491                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2492                      void *cookie, struct ldlm_enqueue_info *einfo,
2493                      struct ptlrpc_request_set *rqset, int async,
2494                      bool speculative)
2495 {
2496         struct obd_device *obd = exp->exp_obd;
2497         struct lustre_handle lockh = { 0 };
2498         struct ptlrpc_request *req = NULL;
2499         int intent = *flags & LDLM_FL_HAS_INTENT;
2500         __u64 match_flags = *flags;
2501         enum ldlm_mode mode;
2502         int rc;
2503         ENTRY;
2504
2505         /* Filesystem lock extents are extended to page boundaries so that
2506          * dealing with the page cache is a little smoother.  */
2507         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2508         policy->l_extent.end |= ~PAGE_MASK;
2509
2510         /* Next, search for already existing extent locks that will cover us */
2511         /* If we're trying to read, we also search for an existing PW lock.  The
2512          * VFS and page cache already protect us locally, so lots of readers/
2513          * writers can share a single PW lock.
2514          *
2515          * There are problems with conversion deadlocks, so instead of
2516          * converting a read lock to a write lock, we'll just enqueue a new
2517          * one.
2518          *
2519          * At some point we should cancel the read lock instead of making them
2520          * send us a blocking callback, but there are problems with canceling
2521          * locks out from other users right now, too. */
2522         mode = einfo->ei_mode;
2523         if (einfo->ei_mode == LCK_PR)
2524                 mode |= LCK_PW;
2525         /* Normal lock requests must wait for the LVB to be ready before
2526          * matching a lock; speculative lock requests do not need to,
2527          * because they will not actually use the lock. */
2528         if (!speculative)
2529                 match_flags |= LDLM_FL_LVB_READY;
2530         if (intent != 0)
2531                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2532         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2533                                einfo->ei_type, policy, mode, &lockh, 0);
2534         if (mode) {
2535                 struct ldlm_lock *matched;
2536
2537                 if (*flags & LDLM_FL_TEST_LOCK)
2538                         RETURN(ELDLM_OK);
2539
2540                 matched = ldlm_handle2lock(&lockh);
2541                 if (speculative) {
2542                         /* This DLM lock request is speculative, and does not
2543                          * have an associated IO request. Therefore if there
2544                          * is already a DLM lock, it wll just inform the
2545                          * caller to cancel the request for this stripe.*/
2546                         lock_res_and_lock(matched);
2547                         if (ldlm_extent_equal(&policy->l_extent,
2548                             &matched->l_policy_data.l_extent))
2549                                 rc = -EEXIST;
2550                         else
2551                                 rc = -ECANCELED;
2552                         unlock_res_and_lock(matched);
2553
2554                         ldlm_lock_decref(&lockh, mode);
2555                         LDLM_LOCK_PUT(matched);
2556                         RETURN(rc);
2557                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2558                         *flags |= LDLM_FL_LVB_READY;
2559
2560                         /* We already have a lock, and it's referenced. */
2561                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2562
2563                         ldlm_lock_decref(&lockh, mode);
2564                         LDLM_LOCK_PUT(matched);
2565                         RETURN(ELDLM_OK);
2566                 } else {
2567                         ldlm_lock_decref(&lockh, mode);
2568                         LDLM_LOCK_PUT(matched);
2569                 }
2570         }
2571
2572         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2573                 RETURN(-ENOLCK);
2574
2575         if (intent) {
2576                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2577                                            &RQF_LDLM_ENQUEUE_LVB);
2578                 if (req == NULL)
2579                         RETURN(-ENOMEM);
2580
2581                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2582                 if (rc) {
2583                         ptlrpc_request_free(req);
2584                         RETURN(rc);
2585                 }
2586
2587                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2588                                      sizeof *lvb);
2589                 ptlrpc_request_set_replen(req);
2590         }
2591
2592         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2593         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2594
2595         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2596                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2597         if (async) {
2598                 if (!rc) {
2599                         struct osc_enqueue_args *aa;
2600                         aa = ptlrpc_req_async_args(aa, req);
2601                         aa->oa_exp         = exp;
2602                         aa->oa_mode        = einfo->ei_mode;
2603                         aa->oa_type        = einfo->ei_type;
2604                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2605                         aa->oa_upcall      = upcall;
2606                         aa->oa_cookie      = cookie;
2607                         aa->oa_speculative = speculative;
2608                         if (!speculative) {
2609                                 aa->oa_flags  = flags;
2610                                 aa->oa_lvb    = lvb;
2611                         } else {
2612                                 /* speculative locks are essentially to enqueue
2613                                  * a DLM lock  in advance, so we don't care
2614                                  * about the result of the enqueue. */
2615                                 aa->oa_lvb    = NULL;
2616                                 aa->oa_flags  = NULL;
2617                         }
2618
2619                         req->rq_interpret_reply = osc_enqueue_interpret;
2620                         if (rqset == PTLRPCD_SET)
2621                                 ptlrpcd_add_req(req);
2622                         else
2623                                 ptlrpc_set_add_req(rqset, req);
2624                 } else if (intent) {
2625                         ptlrpc_req_finished(req);
2626                 }
2627                 RETURN(rc);
2628         }
2629
2630         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2631                               flags, speculative, rc);
2632         if (intent)
2633                 ptlrpc_req_finished(req);
2634
2635         RETURN(rc);
2636 }
2637
2638 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2639                    struct ldlm_res_id *res_id, enum ldlm_type type,
2640                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2641                    __u64 *flags, struct osc_object *obj,
2642                    struct lustre_handle *lockh, int unref)
2643 {
2644         struct obd_device *obd = exp->exp_obd;
2645         __u64 lflags = *flags;
2646         enum ldlm_mode rc;
2647         ENTRY;
2648
2649         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2650                 RETURN(-EIO);
2651
2652         /* Filesystem lock extents are extended to page boundaries so that
2653          * dealing with the page cache is a little smoother */
2654         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2655         policy->l_extent.end |= ~PAGE_MASK;
2656
2657         /* Next, search for already existing extent locks that will cover us */
2658         /* If we're trying to read, we also search for an existing PW lock.  The
2659          * VFS and page cache already protect us locally, so lots of readers/
2660          * writers can share a single PW lock. */
2661         rc = mode;
2662         if (mode == LCK_PR)
2663                 rc |= LCK_PW;
2664         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2665                              res_id, type, policy, rc, lockh, unref);
2666         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2667                 RETURN(rc);
2668
2669         if (obj != NULL) {
2670                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2671
2672                 LASSERT(lock != NULL);
2673                 if (osc_set_lock_data(lock, obj)) {
2674                         lock_res_and_lock(lock);
2675                         if (!ldlm_is_lvb_cached(lock)) {
2676                                 LASSERT(lock->l_ast_data == obj);
2677                                 osc_lock_lvb_update(env, obj, lock, NULL);
2678                                 ldlm_set_lvb_cached(lock);
2679                         }
2680                         unlock_res_and_lock(lock);
2681                 } else {
2682                         ldlm_lock_decref(lockh, rc);
2683                         rc = 0;
2684                 }
2685                 LDLM_LOCK_PUT(lock);
2686         }
2687         RETURN(rc);
2688 }
2689
2690 static int osc_statfs_interpret(const struct lu_env *env,
2691                                 struct ptlrpc_request *req, void *args, int rc)
2692 {
2693         struct osc_async_args *aa = args;
2694         struct obd_statfs *msfs;
2695
2696         ENTRY;
2697         if (rc == -EBADR)
2698                 /*
2699                  * The request has in fact never been sent due to issues at
2700                  * a higher level (LOV).  Exit immediately since the caller
2701                  * is aware of the problem and takes care of the clean up.
2702                  */
2703                 RETURN(rc);
2704
2705         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2706             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2707                 GOTO(out, rc = 0);
2708
2709         if (rc != 0)
2710                 GOTO(out, rc);
2711
2712         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2713         if (msfs == NULL)
2714                 GOTO(out, rc = -EPROTO);
2715
2716         *aa->aa_oi->oi_osfs = *msfs;
2717 out:
2718         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719
2720         RETURN(rc);
2721 }
2722
2723 static int osc_statfs_async(struct obd_export *exp,
2724                             struct obd_info *oinfo, time64_t max_age,
2725                             struct ptlrpc_request_set *rqset)
2726 {
2727         struct obd_device     *obd = class_exp2obd(exp);
2728         struct ptlrpc_request *req;
2729         struct osc_async_args *aa;
2730         int rc;
2731         ENTRY;
2732
2733         if (obd->obd_osfs_age >= max_age) {
2734                 CDEBUG(D_SUPER,
2735                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2736                        obd->obd_name, &obd->obd_osfs,
2737                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2738                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2739                 spin_lock(&obd->obd_osfs_lock);
2740                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2741                 spin_unlock(&obd->obd_osfs_lock);
2742                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2743                 if (oinfo->oi_cb_up)
2744                         oinfo->oi_cb_up(oinfo, 0);
2745
2746                 RETURN(0);
2747         }
2748
2749         /* We could possibly pass max_age in the request (as an absolute
2750          * timestamp or a "seconds.usec ago") so the target can avoid doing
2751          * extra calls into the filesystem if that isn't necessary (e.g.
2752          * during mount that would help a bit).  Having relative timestamps
2753          * is not so great if request processing is slow, while absolute
2754          * timestamps are not ideal because they need time synchronization. */
2755         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2756         if (req == NULL)
2757                 RETURN(-ENOMEM);
2758
2759         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2760         if (rc) {
2761                 ptlrpc_request_free(req);
2762                 RETURN(rc);
2763         }
2764         ptlrpc_request_set_replen(req);
2765         req->rq_request_portal = OST_CREATE_PORTAL;
2766         ptlrpc_at_set_req_timeout(req);
2767
2768         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2769                 /* procfs requests not want stat in wait for avoid deadlock */
2770                 req->rq_no_resend = 1;
2771                 req->rq_no_delay = 1;
2772         }
2773
2774         req->rq_interpret_reply = osc_statfs_interpret;
2775         aa = ptlrpc_req_async_args(aa, req);
2776         aa->aa_oi = oinfo;
2777
2778         ptlrpc_set_add_req(rqset, req);
2779         RETURN(0);
2780 }
2781
2782 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2783                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2784 {
2785         struct obd_device     *obd = class_exp2obd(exp);
2786         struct obd_statfs     *msfs;
2787         struct ptlrpc_request *req;
2788         struct obd_import     *imp = NULL;
2789         int rc;
2790         ENTRY;
2791
2792
2793         /*Since the request might also come from lprocfs, so we need
2794          *sync this with client_disconnect_export Bug15684*/
2795         down_read(&obd->u.cli.cl_sem);
2796         if (obd->u.cli.cl_import)
2797                 imp = class_import_get(obd->u.cli.cl_import);
2798         up_read(&obd->u.cli.cl_sem);
2799         if (!imp)
2800                 RETURN(-ENODEV);
2801
2802         /* We could possibly pass max_age in the request (as an absolute
2803          * timestamp or a "seconds.usec ago") so the target can avoid doing
2804          * extra calls into the filesystem if that isn't necessary (e.g.
2805          * during mount that would help a bit).  Having relative timestamps
2806          * is not so great if request processing is slow, while absolute
2807          * timestamps are not ideal because they need time synchronization. */
2808         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2809
2810         class_import_put(imp);
2811
2812         if (req == NULL)
2813                 RETURN(-ENOMEM);
2814
2815         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2816         if (rc) {
2817                 ptlrpc_request_free(req);
2818                 RETURN(rc);
2819         }
2820         ptlrpc_request_set_replen(req);
2821         req->rq_request_portal = OST_CREATE_PORTAL;
2822         ptlrpc_at_set_req_timeout(req);
2823
2824         if (flags & OBD_STATFS_NODELAY) {
2825                 /* procfs requests not want stat in wait for avoid deadlock */
2826                 req->rq_no_resend = 1;
2827                 req->rq_no_delay = 1;
2828         }
2829
2830         rc = ptlrpc_queue_wait(req);
2831         if (rc)
2832                 GOTO(out, rc);
2833
2834         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2835         if (msfs == NULL)
2836                 GOTO(out, rc = -EPROTO);
2837
2838         *osfs = *msfs;
2839
2840         EXIT;
2841 out:
2842         ptlrpc_req_finished(req);
2843         return rc;
2844 }
2845
2846 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2847                          void *karg, void __user *uarg)
2848 {
2849         struct obd_device *obd = exp->exp_obd;
2850         struct obd_ioctl_data *data = karg;
2851         int rc = 0;
2852
2853         ENTRY;
2854         if (!try_module_get(THIS_MODULE)) {
2855                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2856                        module_name(THIS_MODULE));
2857                 return -EINVAL;
2858         }
2859         switch (cmd) {
2860         case OBD_IOC_CLIENT_RECOVER:
2861                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2862                                            data->ioc_inlbuf1, 0);
2863                 if (rc > 0)
2864                         rc = 0;
2865                 break;
2866         case IOC_OSC_SET_ACTIVE:
2867                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2868                                               data->ioc_offset);
2869                 break;
2870         default:
2871                 rc = -ENOTTY;
2872                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2873                        obd->obd_name, cmd, current_comm(), rc);
2874                 break;
2875         }
2876
2877         module_put(THIS_MODULE);
2878         return rc;
2879 }
2880
2881 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2882                        u32 keylen, void *key, u32 vallen, void *val,
2883                        struct ptlrpc_request_set *set)
2884 {
2885         struct ptlrpc_request *req;
2886         struct obd_device     *obd = exp->exp_obd;
2887         struct obd_import     *imp = class_exp2cliimp(exp);
2888         char                  *tmp;
2889         int                    rc;
2890         ENTRY;
2891
2892         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2893
2894         if (KEY_IS(KEY_CHECKSUM)) {
2895                 if (vallen != sizeof(int))
2896                         RETURN(-EINVAL);
2897                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2898                 RETURN(0);
2899         }
2900
2901         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2902                 sptlrpc_conf_client_adapt(obd);
2903                 RETURN(0);
2904         }
2905
2906         if (KEY_IS(KEY_FLUSH_CTX)) {
2907                 sptlrpc_import_flush_my_ctx(imp);
2908                 RETURN(0);
2909         }
2910
2911         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2912                 struct client_obd *cli = &obd->u.cli;
2913                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2914                 long target = *(long *)val;
2915
2916                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2917                 *(long *)val -= nr;
2918                 RETURN(0);
2919         }
2920
2921         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2922                 RETURN(-EINVAL);
2923
2924         /* We pass all other commands directly to OST. Since nobody calls osc
2925            methods directly and everybody is supposed to go through LOV, we
2926            assume lov checked invalid values for us.
2927            The only recognised values so far are evict_by_nid and mds_conn.
2928            Even if something bad goes through, we'd get a -EINVAL from OST
2929            anyway. */
2930
2931         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2932                                                 &RQF_OST_SET_GRANT_INFO :
2933                                                 &RQF_OBD_SET_INFO);
2934         if (req == NULL)
2935                 RETURN(-ENOMEM);
2936
2937         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2938                              RCL_CLIENT, keylen);
2939         if (!KEY_IS(KEY_GRANT_SHRINK))
2940                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2941                                      RCL_CLIENT, vallen);
2942         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2943         if (rc) {
2944                 ptlrpc_request_free(req);
2945                 RETURN(rc);
2946         }
2947
2948         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2949         memcpy(tmp, key, keylen);
2950         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2951                                                         &RMF_OST_BODY :
2952                                                         &RMF_SETINFO_VAL);
2953         memcpy(tmp, val, vallen);
2954
2955         if (KEY_IS(KEY_GRANT_SHRINK)) {
2956                 struct osc_grant_args *aa;
2957                 struct obdo *oa;
2958
2959                 aa = ptlrpc_req_async_args(aa, req);
2960                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2961                 if (!oa) {
2962                         ptlrpc_req_finished(req);
2963                         RETURN(-ENOMEM);
2964                 }
2965                 *oa = ((struct ost_body *)val)->oa;
2966                 aa->aa_oa = oa;
2967                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2968         }
2969
2970         ptlrpc_request_set_replen(req);
2971         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2972                 LASSERT(set != NULL);
2973                 ptlrpc_set_add_req(set, req);
2974                 ptlrpc_check_set(NULL, set);
2975         } else {
2976                 ptlrpcd_add_req(req);
2977         }
2978
2979         RETURN(0);
2980 }
2981 EXPORT_SYMBOL(osc_set_info_async);
2982
2983 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2984                   struct obd_device *obd, struct obd_uuid *cluuid,
2985                   struct obd_connect_data *data, void *localdata)
2986 {
2987         struct client_obd *cli = &obd->u.cli;
2988
2989         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2990                 long lost_grant;
2991                 long grant;
2992
2993                 spin_lock(&cli->cl_loi_list_lock);
2994                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2995                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2996                         /* restore ocd_grant_blkbits as client page bits */
2997                         data->ocd_grant_blkbits = PAGE_SHIFT;
2998                         grant += cli->cl_dirty_grant;
2999                 } else {
3000                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3001                 }
3002                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3003                 lost_grant = cli->cl_lost_grant;
3004                 cli->cl_lost_grant = 0;
3005                 spin_unlock(&cli->cl_loi_list_lock);
3006
3007                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3008                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3009                        data->ocd_version, data->ocd_grant, lost_grant);
3010         }
3011
3012         RETURN(0);
3013 }
3014 EXPORT_SYMBOL(osc_reconnect);
3015
3016 int osc_disconnect(struct obd_export *exp)
3017 {
3018         struct obd_device *obd = class_exp2obd(exp);
3019         int rc;
3020
3021         rc = client_disconnect_export(exp);
3022         /**
3023          * Initially we put del_shrink_grant before disconnect_export, but it
3024          * causes the following problem if setup (connect) and cleanup
3025          * (disconnect) are tangled together.
3026          *      connect p1                     disconnect p2
3027          *   ptlrpc_connect_import
3028          *     ...............               class_manual_cleanup
3029          *                                     osc_disconnect
3030          *                                     del_shrink_grant
3031          *   ptlrpc_connect_interrupt
3032          *     osc_init_grant
3033          *   add this client to shrink list
3034          *                                      cleanup_osc
3035          * Bang! grant shrink thread trigger the shrink. BUG18662
3036          */
3037         osc_del_grant_list(&obd->u.cli);
3038         return rc;
3039 }
3040 EXPORT_SYMBOL(osc_disconnect);
3041
3042 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3043                                  struct hlist_node *hnode, void *arg)
3044 {
3045         struct lu_env *env = arg;
3046         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3047         struct ldlm_lock *lock;
3048         struct osc_object *osc = NULL;
3049         ENTRY;
3050
3051         lock_res(res);
3052         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3053                 if (lock->l_ast_data != NULL && osc == NULL) {
3054                         osc = lock->l_ast_data;
3055                         cl_object_get(osc2cl(osc));
3056                 }
3057
3058                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3059                  * by the 2nd round of ldlm_namespace_clean() call in
3060                  * osc_import_event(). */
3061                 ldlm_clear_cleaned(lock);
3062         }
3063         unlock_res(res);
3064
3065         if (osc != NULL) {
3066                 osc_object_invalidate(env, osc);
3067                 cl_object_put(env, osc2cl(osc));
3068         }
3069
3070         RETURN(0);
3071 }
3072 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3073
3074 static int osc_import_event(struct obd_device *obd,
3075                             struct obd_import *imp,
3076                             enum obd_import_event event)
3077 {
3078         struct client_obd *cli;
3079         int rc = 0;
3080
3081         ENTRY;
3082         LASSERT(imp->imp_obd == obd);
3083
3084         switch (event) {
3085         case IMP_EVENT_DISCON: {
3086                 cli = &obd->u.cli;
3087                 spin_lock(&cli->cl_loi_list_lock);
3088                 cli->cl_avail_grant = 0;
3089                 cli->cl_lost_grant = 0;
3090                 spin_unlock(&cli->cl_loi_list_lock);
3091                 break;
3092         }
3093         case IMP_EVENT_INACTIVE: {
3094                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3095                 break;
3096         }
3097         case IMP_EVENT_INVALIDATE: {
3098                 struct ldlm_namespace *ns = obd->obd_namespace;
3099                 struct lu_env         *env;
3100                 __u16                  refcheck;
3101
3102                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3103
3104                 env = cl_env_get(&refcheck);
3105                 if (!IS_ERR(env)) {
3106                         osc_io_unplug(env, &obd->u.cli, NULL);
3107
3108                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3109                                                  osc_ldlm_resource_invalidate,
3110                                                  env, 0);
3111                         cl_env_put(env, &refcheck);
3112
3113                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3114                 } else
3115                         rc = PTR_ERR(env);
3116                 break;
3117         }
3118         case IMP_EVENT_ACTIVE: {
3119                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3120                 break;
3121         }
3122         case IMP_EVENT_OCD: {
3123                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3124
3125                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3126                         osc_init_grant(&obd->u.cli, ocd);
3127
3128                 /* See bug 7198 */
3129                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3130                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3131
3132                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3133                 break;
3134         }
3135         case IMP_EVENT_DEACTIVATE: {
3136                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3137                 break;
3138         }
3139         case IMP_EVENT_ACTIVATE: {
3140                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3141                 break;
3142         }
3143         default:
3144                 CERROR("Unknown import event %d\n", event);
3145                 LBUG();
3146         }
3147         RETURN(rc);
3148 }
3149
3150 /**
3151  * Determine whether the lock can be canceled before replaying the lock
3152  * during recovery, see bug16774 for detailed information.
3153  *
3154  * \retval zero the lock can't be canceled
3155  * \retval other ok to cancel
3156  */
3157 static int osc_cancel_weight(struct ldlm_lock *lock)
3158 {
3159         /*
3160          * Cancel all unused and granted extent lock.
3161          */
3162         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3163             ldlm_is_granted(lock) &&
3164             osc_ldlm_weigh_ast(lock) == 0)
3165                 RETURN(1);
3166
3167         RETURN(0);
3168 }
3169
3170 static int brw_queue_work(const struct lu_env *env, void *data)
3171 {
3172         struct client_obd *cli = data;
3173
3174         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3175
3176         osc_io_unplug(env, cli, NULL);
3177         RETURN(0);
3178 }
3179
3180 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3181 {
3182         struct client_obd *cli = &obd->u.cli;
3183         void *handler;
3184         int rc;
3185
3186         ENTRY;
3187
3188         rc = ptlrpcd_addref();
3189         if (rc)
3190                 RETURN(rc);
3191
3192         rc = client_obd_setup(obd, lcfg);
3193         if (rc)
3194                 GOTO(out_ptlrpcd, rc);
3195
3196
3197         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3198         if (IS_ERR(handler))
3199                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3200         cli->cl_writeback_work = handler;
3201
3202         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3203         if (IS_ERR(handler))
3204                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205         cli->cl_lru_work = handler;
3206
3207         rc = osc_quota_setup(obd);
3208         if (rc)
3209                 GOTO(out_ptlrpcd_work, rc);
3210
3211         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3212         osc_update_next_shrink(cli);
3213
3214         RETURN(rc);
3215
3216 out_ptlrpcd_work:
3217         if (cli->cl_writeback_work != NULL) {
3218                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3219                 cli->cl_writeback_work = NULL;
3220         }
3221         if (cli->cl_lru_work != NULL) {
3222                 ptlrpcd_destroy_work(cli->cl_lru_work);
3223                 cli->cl_lru_work = NULL;
3224         }
3225         client_obd_cleanup(obd);
3226 out_ptlrpcd:
3227         ptlrpcd_decref();
3228         RETURN(rc);
3229 }
3230 EXPORT_SYMBOL(osc_setup_common);
3231
3232 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3233 {
3234         struct client_obd *cli = &obd->u.cli;
3235         int                adding;
3236         int                added;
3237         int                req_count;
3238         int                rc;
3239
3240         ENTRY;
3241
3242         rc = osc_setup_common(obd, lcfg);
3243         if (rc < 0)
3244                 RETURN(rc);
3245
3246         rc = osc_tunables_init(obd);
3247         if (rc)
3248                 RETURN(rc);
3249
3250         /*
3251          * We try to control the total number of requests with a upper limit
3252          * osc_reqpool_maxreqcount. There might be some race which will cause
3253          * over-limit allocation, but it is fine.
3254          */
3255         req_count = atomic_read(&osc_pool_req_count);
3256         if (req_count < osc_reqpool_maxreqcount) {
3257                 adding = cli->cl_max_rpcs_in_flight + 2;
3258                 if (req_count + adding > osc_reqpool_maxreqcount)
3259                         adding = osc_reqpool_maxreqcount - req_count;
3260
3261                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3262                 atomic_add(added, &osc_pool_req_count);
3263         }
3264
3265         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3266
3267         spin_lock(&osc_shrink_lock);
3268         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3269         spin_unlock(&osc_shrink_lock);
3270         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3271         cli->cl_import->imp_idle_debug = D_HA;
3272
3273         RETURN(0);
3274 }
3275
3276 int osc_precleanup_common(struct obd_device *obd)
3277 {
3278         struct client_obd *cli = &obd->u.cli;
3279         ENTRY;
3280
3281         /* LU-464
3282          * for echo client, export may be on zombie list, wait for
3283          * zombie thread to cull it, because cli.cl_import will be
3284          * cleared in client_disconnect_export():
3285          *   class_export_destroy() -> obd_cleanup() ->
3286          *   echo_device_free() -> echo_client_cleanup() ->
3287          *   obd_disconnect() -> osc_disconnect() ->
3288          *   client_disconnect_export()
3289          */
3290         obd_zombie_barrier();
3291         if (cli->cl_writeback_work) {
3292                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3293                 cli->cl_writeback_work = NULL;
3294         }
3295
3296         if (cli->cl_lru_work) {
3297                 ptlrpcd_destroy_work(cli->cl_lru_work);
3298                 cli->cl_lru_work = NULL;
3299         }
3300
3301         obd_cleanup_client_import(obd);
3302         RETURN(0);
3303 }
3304 EXPORT_SYMBOL(osc_precleanup_common);
3305
3306 static int osc_precleanup(struct obd_device *obd)
3307 {
3308         ENTRY;
3309
3310         osc_precleanup_common(obd);
3311
3312         ptlrpc_lprocfs_unregister_obd(obd);
3313         RETURN(0);
3314 }
3315
3316 int osc_cleanup_common(struct obd_device *obd)
3317 {
3318         struct client_obd *cli = &obd->u.cli;
3319         int rc;
3320
3321         ENTRY;
3322
3323         spin_lock(&osc_shrink_lock);
3324         list_del(&cli->cl_shrink_list);
3325         spin_unlock(&osc_shrink_lock);
3326
3327         /* lru cleanup */
3328         if (cli->cl_cache != NULL) {
3329                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3330                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3331                 list_del_init(&cli->cl_lru_osc);
3332                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3333                 cli->cl_lru_left = NULL;
3334                 cl_cache_decref(cli->cl_cache);
3335                 cli->cl_cache = NULL;
3336         }
3337
3338         /* free memory of osc quota cache */
3339         osc_quota_cleanup(obd);
3340
3341         rc = client_obd_cleanup(obd);
3342
3343         ptlrpcd_decref();
3344         RETURN(rc);
3345 }
3346 EXPORT_SYMBOL(osc_cleanup_common);
3347
3348 static const struct obd_ops osc_obd_ops = {
3349         .o_owner                = THIS_MODULE,
3350         .o_setup                = osc_setup,
3351         .o_precleanup           = osc_precleanup,
3352         .o_cleanup              = osc_cleanup_common,
3353         .o_add_conn             = client_import_add_conn,
3354         .o_del_conn             = client_import_del_conn,
3355         .o_connect              = client_connect_import,
3356         .o_reconnect            = osc_reconnect,
3357         .o_disconnect           = osc_disconnect,
3358         .o_statfs               = osc_statfs,
3359         .o_statfs_async         = osc_statfs_async,
3360         .o_create               = osc_create,
3361         .o_destroy              = osc_destroy,
3362         .o_getattr              = osc_getattr,
3363         .o_setattr              = osc_setattr,
3364         .o_iocontrol            = osc_iocontrol,
3365         .o_set_info_async       = osc_set_info_async,
3366         .o_import_event         = osc_import_event,
3367         .o_quotactl             = osc_quotactl,
3368 };
3369
3370 static struct shrinker *osc_cache_shrinker;
3371 LIST_HEAD(osc_shrink_list);
3372 DEFINE_SPINLOCK(osc_shrink_lock);
3373
3374 #ifndef HAVE_SHRINKER_COUNT
3375 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3376 {
3377         struct shrink_control scv = {
3378                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3379                 .gfp_mask   = shrink_param(sc, gfp_mask)
3380         };
3381         (void)osc_cache_shrink_scan(shrinker, &scv);
3382
3383         return osc_cache_shrink_count(shrinker, &scv);
3384 }
3385 #endif
3386
3387 static int __init osc_init(void)
3388 {
3389         unsigned int reqpool_size;
3390         unsigned int reqsize;
3391         int rc;
3392         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3393                          osc_cache_shrink_count, osc_cache_shrink_scan);
3394         ENTRY;
3395
3396         /* print an address of _any_ initialized kernel symbol from this
3397          * module, to allow debugging with gdb that doesn't support data
3398          * symbols from modules.*/
3399         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3400
3401         rc = lu_kmem_init(osc_caches);
3402         if (rc)
3403                 RETURN(rc);
3404
3405         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3406                                  LUSTRE_OSC_NAME, &osc_device_type);
3407         if (rc)
3408                 GOTO(out_kmem, rc);
3409
3410         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3411
3412         /* This is obviously too much memory, only prevent overflow here */
3413         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3414                 GOTO(out_type, rc = -EINVAL);
3415
3416         reqpool_size = osc_reqpool_mem_max << 20;
3417
3418         reqsize = 1;
3419         while (reqsize < OST_IO_MAXREQSIZE)
3420                 reqsize = reqsize << 1;
3421
3422         /*
3423          * We don't enlarge the request count in OSC pool according to
3424          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3425          * tried after normal allocation failed. So a small OSC pool won't
3426          * cause much performance degression in most of cases.
3427          */
3428         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3429
3430         atomic_set(&osc_pool_req_count, 0);
3431         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3432                                           ptlrpc_add_rqs_to_pool);
3433
3434         if (osc_rq_pool == NULL)
3435                 GOTO(out_type, rc = -ENOMEM);
3436
3437         rc = osc_start_grant_work();
3438         if (rc != 0)
3439                 GOTO(out_req_pool, rc);
3440
3441         RETURN(rc);
3442
3443 out_req_pool:
3444         ptlrpc_free_rq_pool(osc_rq_pool);
3445 out_type:
3446         class_unregister_type(LUSTRE_OSC_NAME);
3447 out_kmem:
3448         lu_kmem_fini(osc_caches);
3449
3450         RETURN(rc);
3451 }
3452
3453 static void __exit osc_exit(void)
3454 {
3455         osc_stop_grant_work();
3456         remove_shrinker(osc_cache_shrinker);
3457         class_unregister_type(LUSTRE_OSC_NAME);
3458         lu_kmem_fini(osc_caches);
3459         ptlrpc_free_rq_pool(osc_rq_pool);
3460 }
3461
3462 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3463 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3464 MODULE_VERSION(LUSTRE_VERSION_STRING);
3465 MODULE_LICENSE("GPL");
3466
3467 module_init(osc_init);
3468 module_exit(osc_exit);