Whamcloud - gitweb
3a76729d4652d9f5a9880834d890b1396a22f402
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         LIST_HEAD(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 /*
617                  * Wait until the number of on-going destroy RPCs drops
618                  * under max_rpc_in_flight
619                  */
620                 rc = l_wait_event_abortable_exclusive(
621                         cli->cl_destroy_waitq,
622                         osc_can_send_destroy(cli));
623                 if (rc) {
624                         ptlrpc_req_finished(req);
625                         RETURN(-EINTR);
626                 }
627         }
628
629         /* Do not wait for response */
630         ptlrpcd_add_req(req);
631         RETURN(0);
632 }
633
634 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
635                                 long writing_bytes)
636 {
637         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
638
639         LASSERT(!(oa->o_valid & bits));
640
641         oa->o_valid |= bits;
642         spin_lock(&cli->cl_loi_list_lock);
643         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
644                 oa->o_dirty = cli->cl_dirty_grant;
645         else
646                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
647         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
648                 CERROR("dirty %lu > dirty_max %lu\n",
649                        cli->cl_dirty_pages,
650                        cli->cl_dirty_max_pages);
651                 oa->o_undirty = 0;
652         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
653                             (long)(obd_max_dirty_pages + 1))) {
654                 /* The atomic_read() allowing the atomic_inc() are
655                  * not covered by a lock thus they may safely race and trip
656                  * this CERROR() unless we add in a small fudge factor (+1). */
657                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
658                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
659                        obd_max_dirty_pages);
660                 oa->o_undirty = 0;
661         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
662                             0x7fffffff)) {
663                 CERROR("dirty %lu - dirty_max %lu too big???\n",
664                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
665                 oa->o_undirty = 0;
666         } else {
667                 unsigned long nrpages;
668                 unsigned long undirty;
669
670                 nrpages = cli->cl_max_pages_per_rpc;
671                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
672                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
673                 undirty = nrpages << PAGE_SHIFT;
674                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
675                                  GRANT_PARAM)) {
676                         int nrextents;
677
678                         /* take extent tax into account when asking for more
679                          * grant space */
680                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
681                                      cli->cl_max_extent_pages;
682                         undirty += nrextents * cli->cl_grant_extent_tax;
683                 }
684                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
685                  * to add extent tax, etc.
686                  */
687                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
688                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
689         }
690         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
691         oa->o_dropped = cli->cl_lost_grant;
692         cli->cl_lost_grant = 0;
693         spin_unlock(&cli->cl_loi_list_lock);
694         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
695                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
696 }
697
698 void osc_update_next_shrink(struct client_obd *cli)
699 {
700         cli->cl_next_shrink_grant = ktime_get_seconds() +
701                                     cli->cl_grant_shrink_interval;
702
703         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
704                cli->cl_next_shrink_grant);
705 }
706
707 static void __osc_update_grant(struct client_obd *cli, u64 grant)
708 {
709         spin_lock(&cli->cl_loi_list_lock);
710         cli->cl_avail_grant += grant;
711         spin_unlock(&cli->cl_loi_list_lock);
712 }
713
714 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
715 {
716         if (body->oa.o_valid & OBD_MD_FLGRANT) {
717                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
718                 __osc_update_grant(cli, body->oa.o_grant);
719         }
720 }
721
722 /**
723  * grant thread data for shrinking space.
724  */
725 struct grant_thread_data {
726         struct list_head        gtd_clients;
727         struct mutex            gtd_mutex;
728         unsigned long           gtd_stopped:1;
729 };
730 static struct grant_thread_data client_gtd;
731
732 static int osc_shrink_grant_interpret(const struct lu_env *env,
733                                       struct ptlrpc_request *req,
734                                       void *args, int rc)
735 {
736         struct osc_grant_args *aa = args;
737         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
738         struct ost_body *body;
739
740         if (rc != 0) {
741                 __osc_update_grant(cli, aa->aa_oa->o_grant);
742                 GOTO(out, rc);
743         }
744
745         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
746         LASSERT(body);
747         osc_update_grant(cli, body);
748 out:
749         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
750         aa->aa_oa = NULL;
751
752         return rc;
753 }
754
755 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
756 {
757         spin_lock(&cli->cl_loi_list_lock);
758         oa->o_grant = cli->cl_avail_grant / 4;
759         cli->cl_avail_grant -= oa->o_grant;
760         spin_unlock(&cli->cl_loi_list_lock);
761         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
762                 oa->o_valid |= OBD_MD_FLFLAGS;
763                 oa->o_flags = 0;
764         }
765         oa->o_flags |= OBD_FL_SHRINK_GRANT;
766         osc_update_next_shrink(cli);
767 }
768
769 /* Shrink the current grant, either from some large amount to enough for a
770  * full set of in-flight RPCs, or if we have already shrunk to that limit
771  * then to enough for a single RPC.  This avoids keeping more grant than
772  * needed, and avoids shrinking the grant piecemeal. */
773 static int osc_shrink_grant(struct client_obd *cli)
774 {
775         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
776                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
777
778         spin_lock(&cli->cl_loi_list_lock);
779         if (cli->cl_avail_grant <= target_bytes)
780                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
781         spin_unlock(&cli->cl_loi_list_lock);
782
783         return osc_shrink_grant_to_target(cli, target_bytes);
784 }
785
786 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
787 {
788         int                     rc = 0;
789         struct ost_body        *body;
790         ENTRY;
791
792         spin_lock(&cli->cl_loi_list_lock);
793         /* Don't shrink if we are already above or below the desired limit
794          * We don't want to shrink below a single RPC, as that will negatively
795          * impact block allocation and long-term performance. */
796         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
797                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
798
799         if (target_bytes >= cli->cl_avail_grant) {
800                 spin_unlock(&cli->cl_loi_list_lock);
801                 RETURN(0);
802         }
803         spin_unlock(&cli->cl_loi_list_lock);
804
805         OBD_ALLOC_PTR(body);
806         if (!body)
807                 RETURN(-ENOMEM);
808
809         osc_announce_cached(cli, &body->oa, 0);
810
811         spin_lock(&cli->cl_loi_list_lock);
812         if (target_bytes >= cli->cl_avail_grant) {
813                 /* available grant has changed since target calculation */
814                 spin_unlock(&cli->cl_loi_list_lock);
815                 GOTO(out_free, rc = 0);
816         }
817         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
818         cli->cl_avail_grant = target_bytes;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
821                 body->oa.o_valid |= OBD_MD_FLFLAGS;
822                 body->oa.o_flags = 0;
823         }
824         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826
827         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
828                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
829                                 sizeof(*body), body, NULL);
830         if (rc != 0)
831                 __osc_update_grant(cli, body->oa.o_grant);
832 out_free:
833         OBD_FREE_PTR(body);
834         RETURN(rc);
835 }
836
837 static int osc_should_shrink_grant(struct client_obd *client)
838 {
839         time64_t next_shrink = client->cl_next_shrink_grant;
840
841         if (client->cl_import == NULL)
842                 return 0;
843
844         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
845             client->cl_import->imp_grant_shrink_disabled) {
846                 osc_update_next_shrink(client);
847                 return 0;
848         }
849
850         if (ktime_get_seconds() >= next_shrink - 5) {
851                 /* Get the current RPC size directly, instead of going via:
852                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
853                  * Keep comment here so that it can be found by searching. */
854                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
855
856                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
857                     client->cl_avail_grant > brw_size)
858                         return 1;
859                 else
860                         osc_update_next_shrink(client);
861         }
862         return 0;
863 }
864
865 #define GRANT_SHRINK_RPC_BATCH  100
866
867 static struct delayed_work work;
868
869 static void osc_grant_work_handler(struct work_struct *data)
870 {
871         struct client_obd *cli;
872         int rpc_sent;
873         bool init_next_shrink = true;
874         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
875
876         rpc_sent = 0;
877         mutex_lock(&client_gtd.gtd_mutex);
878         list_for_each_entry(cli, &client_gtd.gtd_clients,
879                             cl_grant_chain) {
880                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
881                     osc_should_shrink_grant(cli)) {
882                         osc_shrink_grant(cli);
883                         rpc_sent++;
884                 }
885
886                 if (!init_next_shrink) {
887                         if (cli->cl_next_shrink_grant < next_shrink &&
888                             cli->cl_next_shrink_grant > ktime_get_seconds())
889                                 next_shrink = cli->cl_next_shrink_grant;
890                 } else {
891                         init_next_shrink = false;
892                         next_shrink = cli->cl_next_shrink_grant;
893                 }
894         }
895         mutex_unlock(&client_gtd.gtd_mutex);
896
897         if (client_gtd.gtd_stopped == 1)
898                 return;
899
900         if (next_shrink > ktime_get_seconds()) {
901                 time64_t delay = next_shrink - ktime_get_seconds();
902
903                 schedule_delayed_work(&work, cfs_time_seconds(delay));
904         } else {
905                 schedule_work(&work.work);
906         }
907 }
908
909 void osc_schedule_grant_work(void)
910 {
911         cancel_delayed_work_sync(&work);
912         schedule_work(&work.work);
913 }
914
915 /**
916  * Start grant thread for returing grant to server for idle clients.
917  */
918 static int osc_start_grant_work(void)
919 {
920         client_gtd.gtd_stopped = 0;
921         mutex_init(&client_gtd.gtd_mutex);
922         INIT_LIST_HEAD(&client_gtd.gtd_clients);
923
924         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
925         schedule_work(&work.work);
926
927         return 0;
928 }
929
930 static void osc_stop_grant_work(void)
931 {
932         client_gtd.gtd_stopped = 1;
933         cancel_delayed_work_sync(&work);
934 }
935
936 static void osc_add_grant_list(struct client_obd *client)
937 {
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
940         mutex_unlock(&client_gtd.gtd_mutex);
941 }
942
943 static void osc_del_grant_list(struct client_obd *client)
944 {
945         if (list_empty(&client->cl_grant_chain))
946                 return;
947
948         mutex_lock(&client_gtd.gtd_mutex);
949         list_del_init(&client->cl_grant_chain);
950         mutex_unlock(&client_gtd.gtd_mutex);
951 }
952
953 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
954 {
955         /*
956          * ocd_grant is the total grant amount we're expect to hold: if we've
957          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
958          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
959          * dirty.
960          *
961          * race is tolerable here: if we're evicted, but imp_state already
962          * left EVICTED state, then cl_dirty_pages must be 0 already.
963          */
964         spin_lock(&cli->cl_loi_list_lock);
965         cli->cl_avail_grant = ocd->ocd_grant;
966         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
967                 cli->cl_avail_grant -= cli->cl_reserved_grant;
968                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
969                         cli->cl_avail_grant -= cli->cl_dirty_grant;
970                 else
971                         cli->cl_avail_grant -=
972                                         cli->cl_dirty_pages << PAGE_SHIFT;
973         }
974
975         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
976                 u64 size;
977                 int chunk_mask;
978
979                 /* overhead for each extent insertion */
980                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
981                 /* determine the appropriate chunk size used by osc_extent. */
982                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
983                                           ocd->ocd_grant_blkbits);
984                 /* max_pages_per_rpc must be chunk aligned */
985                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
986                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
987                                              ~chunk_mask) & chunk_mask;
988                 /* determine maximum extent size, in #pages */
989                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
990                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
991                 if (cli->cl_max_extent_pages == 0)
992                         cli->cl_max_extent_pages = 1;
993         } else {
994                 cli->cl_grant_extent_tax = 0;
995                 cli->cl_chunkbits = PAGE_SHIFT;
996                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
997         }
998         spin_unlock(&cli->cl_loi_list_lock);
999
1000         CDEBUG(D_CACHE,
1001                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1002                cli_name(cli),
1003                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1004                cli->cl_max_extent_pages);
1005
1006         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1007                 osc_add_grant_list(cli);
1008 }
1009 EXPORT_SYMBOL(osc_init_grant);
1010
1011 /* We assume that the reason this OSC got a short read is because it read
1012  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1013  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1014  * this stripe never got written at or beyond this stripe offset yet. */
1015 static void handle_short_read(int nob_read, size_t page_count,
1016                               struct brw_page **pga)
1017 {
1018         char *ptr;
1019         int i = 0;
1020
1021         /* skip bytes read OK */
1022         while (nob_read > 0) {
1023                 LASSERT (page_count > 0);
1024
1025                 if (pga[i]->count > nob_read) {
1026                         /* EOF inside this page */
1027                         ptr = kmap(pga[i]->pg) +
1028                                 (pga[i]->off & ~PAGE_MASK);
1029                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1030                         kunmap(pga[i]->pg);
1031                         page_count--;
1032                         i++;
1033                         break;
1034                 }
1035
1036                 nob_read -= pga[i]->count;
1037                 page_count--;
1038                 i++;
1039         }
1040
1041         /* zero remaining pages */
1042         while (page_count-- > 0) {
1043                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1044                 memset(ptr, 0, pga[i]->count);
1045                 kunmap(pga[i]->pg);
1046                 i++;
1047         }
1048 }
1049
1050 static int check_write_rcs(struct ptlrpc_request *req,
1051                            int requested_nob, int niocount,
1052                            size_t page_count, struct brw_page **pga)
1053 {
1054         int     i;
1055         __u32   *remote_rcs;
1056
1057         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1058                                                   sizeof(*remote_rcs) *
1059                                                   niocount);
1060         if (remote_rcs == NULL) {
1061                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1062                 return(-EPROTO);
1063         }
1064
1065         /* return error if any niobuf was in error */
1066         for (i = 0; i < niocount; i++) {
1067                 if ((int)remote_rcs[i] < 0) {
1068                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1069                                i, remote_rcs[i], req);
1070                         return remote_rcs[i];
1071                 }
1072
1073                 if (remote_rcs[i] != 0) {
1074                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1075                                 i, remote_rcs[i], req);
1076                         return(-EPROTO);
1077                 }
1078         }
1079         if (req->rq_bulk != NULL &&
1080             req->rq_bulk->bd_nob_transferred != requested_nob) {
1081                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1082                        req->rq_bulk->bd_nob_transferred, requested_nob);
1083                 return(-EPROTO);
1084         }
1085
1086         return (0);
1087 }
1088
1089 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1090 {
1091         if (p1->flag != p2->flag) {
1092                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1093                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1094                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1095
1096                 /* warn if we try to combine flags that we don't know to be
1097                  * safe to combine */
1098                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1099                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1100                               "report this at https://jira.whamcloud.com/\n",
1101                               p1->flag, p2->flag);
1102                 }
1103                 return 0;
1104         }
1105
1106         return (p1->off + p1->count == p2->off);
1107 }
1108
1109 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1110 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1111                                    size_t pg_count, struct brw_page **pga,
1112                                    int opc, obd_dif_csum_fn *fn,
1113                                    int sector_size,
1114                                    u32 *check_sum)
1115 {
1116         struct ahash_request *req;
1117         /* Used Adler as the default checksum type on top of DIF tags */
1118         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1119         struct page *__page;
1120         unsigned char *buffer;
1121         __u16 *guard_start;
1122         unsigned int bufsize;
1123         int guard_number;
1124         int used_number = 0;
1125         int used;
1126         u32 cksum;
1127         int rc = 0;
1128         int i = 0;
1129
1130         LASSERT(pg_count > 0);
1131
1132         __page = alloc_page(GFP_KERNEL);
1133         if (__page == NULL)
1134                 return -ENOMEM;
1135
1136         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1137         if (IS_ERR(req)) {
1138                 rc = PTR_ERR(req);
1139                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1140                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1141                 GOTO(out, rc);
1142         }
1143
1144         buffer = kmap(__page);
1145         guard_start = (__u16 *)buffer;
1146         guard_number = PAGE_SIZE / sizeof(*guard_start);
1147         while (nob > 0 && pg_count > 0) {
1148                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1149
1150                 /* corrupt the data before we compute the checksum, to
1151                  * simulate an OST->client data error */
1152                 if (unlikely(i == 0 && opc == OST_READ &&
1153                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1154                         unsigned char *ptr = kmap(pga[i]->pg);
1155                         int off = pga[i]->off & ~PAGE_MASK;
1156
1157                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1158                         kunmap(pga[i]->pg);
1159                 }
1160
1161                 /*
1162                  * The left guard number should be able to hold checksums of a
1163                  * whole page
1164                  */
1165                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1166                                                   pga[i]->off & ~PAGE_MASK,
1167                                                   count,
1168                                                   guard_start + used_number,
1169                                                   guard_number - used_number,
1170                                                   &used, sector_size,
1171                                                   fn);
1172                 if (rc)
1173                         break;
1174
1175                 used_number += used;
1176                 if (used_number == guard_number) {
1177                         cfs_crypto_hash_update_page(req, __page, 0,
1178                                 used_number * sizeof(*guard_start));
1179                         used_number = 0;
1180                 }
1181
1182                 nob -= pga[i]->count;
1183                 pg_count--;
1184                 i++;
1185         }
1186         kunmap(__page);
1187         if (rc)
1188                 GOTO(out, rc);
1189
1190         if (used_number != 0)
1191                 cfs_crypto_hash_update_page(req, __page, 0,
1192                         used_number * sizeof(*guard_start));
1193
1194         bufsize = sizeof(cksum);
1195         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1196
1197         /* For sending we only compute the wrong checksum instead
1198          * of corrupting the data so it is still correct on a redo */
1199         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1200                 cksum++;
1201
1202         *check_sum = cksum;
1203 out:
1204         __free_page(__page);
1205         return rc;
1206 }
1207 #else /* !CONFIG_CRC_T10DIF */
1208 #define obd_dif_ip_fn NULL
1209 #define obd_dif_crc_fn NULL
1210 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1211         -EOPNOTSUPP
1212 #endif /* CONFIG_CRC_T10DIF */
1213
1214 static int osc_checksum_bulk(int nob, size_t pg_count,
1215                              struct brw_page **pga, int opc,
1216                              enum cksum_types cksum_type,
1217                              u32 *cksum)
1218 {
1219         int                             i = 0;
1220         struct ahash_request           *req;
1221         unsigned int                    bufsize;
1222         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1223
1224         LASSERT(pg_count > 0);
1225
1226         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1227         if (IS_ERR(req)) {
1228                 CERROR("Unable to initialize checksum hash %s\n",
1229                        cfs_crypto_hash_name(cfs_alg));
1230                 return PTR_ERR(req);
1231         }
1232
1233         while (nob > 0 && pg_count > 0) {
1234                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1235
1236                 /* corrupt the data before we compute the checksum, to
1237                  * simulate an OST->client data error */
1238                 if (i == 0 && opc == OST_READ &&
1239                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1240                         unsigned char *ptr = kmap(pga[i]->pg);
1241                         int off = pga[i]->off & ~PAGE_MASK;
1242
1243                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1244                         kunmap(pga[i]->pg);
1245                 }
1246                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1247                                             pga[i]->off & ~PAGE_MASK,
1248                                             count);
1249                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1250                                (int)(pga[i]->off & ~PAGE_MASK));
1251
1252                 nob -= pga[i]->count;
1253                 pg_count--;
1254                 i++;
1255         }
1256
1257         bufsize = sizeof(*cksum);
1258         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1259
1260         /* For sending we only compute the wrong checksum instead
1261          * of corrupting the data so it is still correct on a redo */
1262         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1263                 (*cksum)++;
1264
1265         return 0;
1266 }
1267
1268 static int osc_checksum_bulk_rw(const char *obd_name,
1269                                 enum cksum_types cksum_type,
1270                                 int nob, size_t pg_count,
1271                                 struct brw_page **pga, int opc,
1272                                 u32 *check_sum)
1273 {
1274         obd_dif_csum_fn *fn = NULL;
1275         int sector_size = 0;
1276         int rc;
1277
1278         ENTRY;
1279         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1280
1281         if (fn)
1282                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1283                                              opc, fn, sector_size, check_sum);
1284         else
1285                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1286                                        check_sum);
1287
1288         RETURN(rc);
1289 }
1290
1291 static int
1292 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1293                      u32 page_count, struct brw_page **pga,
1294                      struct ptlrpc_request **reqp, int resend)
1295 {
1296         struct ptlrpc_request   *req;
1297         struct ptlrpc_bulk_desc *desc;
1298         struct ost_body         *body;
1299         struct obd_ioobj        *ioobj;
1300         struct niobuf_remote    *niobuf;
1301         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1302         struct osc_brw_async_args *aa;
1303         struct req_capsule      *pill;
1304         struct brw_page *pg_prev;
1305         void *short_io_buf;
1306         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1307
1308         ENTRY;
1309         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1310                 RETURN(-ENOMEM); /* Recoverable */
1311         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1312                 RETURN(-EINVAL); /* Fatal */
1313
1314         if ((cmd & OBD_BRW_WRITE) != 0) {
1315                 opc = OST_WRITE;
1316                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1317                                                 osc_rq_pool,
1318                                                 &RQF_OST_BRW_WRITE);
1319         } else {
1320                 opc = OST_READ;
1321                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1322         }
1323         if (req == NULL)
1324                 RETURN(-ENOMEM);
1325
1326         for (niocount = i = 1; i < page_count; i++) {
1327                 if (!can_merge_pages(pga[i - 1], pga[i]))
1328                         niocount++;
1329         }
1330
1331         pill = &req->rq_pill;
1332         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1333                              sizeof(*ioobj));
1334         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1335                              niocount * sizeof(*niobuf));
1336
1337         for (i = 0; i < page_count; i++)
1338                 short_io_size += pga[i]->count;
1339
1340         /* Check if read/write is small enough to be a short io. */
1341         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1342             !imp_connect_shortio(cli->cl_import))
1343                 short_io_size = 0;
1344
1345         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1346                              opc == OST_READ ? 0 : short_io_size);
1347         if (opc == OST_READ)
1348                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1349                                      short_io_size);
1350
1351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1352         if (rc) {
1353                 ptlrpc_request_free(req);
1354                 RETURN(rc);
1355         }
1356         osc_set_io_portal(req);
1357
1358         ptlrpc_at_set_req_timeout(req);
1359         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1360          * retry logic */
1361         req->rq_no_retry_einprogress = 1;
1362
1363         if (short_io_size != 0) {
1364                 desc = NULL;
1365                 short_io_buf = NULL;
1366                 goto no_bulk;
1367         }
1368
1369         desc = ptlrpc_prep_bulk_imp(req, page_count,
1370                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1371                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1372                         PTLRPC_BULK_PUT_SINK) |
1373                         PTLRPC_BULK_BUF_KIOV,
1374                 OST_BULK_PORTAL,
1375                 &ptlrpc_bulk_kiov_pin_ops);
1376
1377         if (desc == NULL)
1378                 GOTO(out, rc = -ENOMEM);
1379         /* NB request now owns desc and will free it when it gets freed */
1380 no_bulk:
1381         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1382         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1383         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1384         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1385
1386         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1387
1388         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1389          * and from_kgid(), because they are asynchronous. Fortunately, variable
1390          * oa contains valid o_uid and o_gid in these two operations.
1391          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1392          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1393          * other process logic */
1394         body->oa.o_uid = oa->o_uid;
1395         body->oa.o_gid = oa->o_gid;
1396
1397         obdo_to_ioobj(oa, ioobj);
1398         ioobj->ioo_bufcnt = niocount;
1399         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1400          * that might be send for this request.  The actual number is decided
1401          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1402          * "max - 1" for old client compatibility sending "0", and also so the
1403          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1404         if (desc != NULL)
1405                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1406         else /* short io */
1407                 ioobj_max_brw_set(ioobj, 0);
1408
1409         if (short_io_size != 0) {
1410                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1411                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1412                         body->oa.o_flags = 0;
1413                 }
1414                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1415                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1416                        short_io_size);
1417                 if (opc == OST_WRITE) {
1418                         short_io_buf = req_capsule_client_get(pill,
1419                                                               &RMF_SHORT_IO);
1420                         LASSERT(short_io_buf != NULL);
1421                 }
1422         }
1423
1424         LASSERT(page_count > 0);
1425         pg_prev = pga[0];
1426         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1427                 struct brw_page *pg = pga[i];
1428                 int poff = pg->off & ~PAGE_MASK;
1429
1430                 LASSERT(pg->count > 0);
1431                 /* make sure there is no gap in the middle of page array */
1432                 LASSERTF(page_count == 1 ||
1433                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1434                           ergo(i > 0 && i < page_count - 1,
1435                                poff == 0 && pg->count == PAGE_SIZE)   &&
1436                           ergo(i == page_count - 1, poff == 0)),
1437                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1438                          i, page_count, pg, pg->off, pg->count);
1439                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1440                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1441                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1442                          i, page_count,
1443                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1444                          pg_prev->pg, page_private(pg_prev->pg),
1445                          pg_prev->pg->index, pg_prev->off);
1446                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1447                         (pg->flag & OBD_BRW_SRVLOCK));
1448                 if (short_io_size != 0 && opc == OST_WRITE) {
1449                         unsigned char *ptr = kmap_atomic(pg->pg);
1450
1451                         LASSERT(short_io_size >= requested_nob + pg->count);
1452                         memcpy(short_io_buf + requested_nob,
1453                                ptr + poff,
1454                                pg->count);
1455                         kunmap_atomic(ptr);
1456                 } else if (short_io_size == 0) {
1457                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1458                                                          pg->count);
1459                 }
1460                 requested_nob += pg->count;
1461
1462                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1463                         niobuf--;
1464                         niobuf->rnb_len += pg->count;
1465                 } else {
1466                         niobuf->rnb_offset = pg->off;
1467                         niobuf->rnb_len    = pg->count;
1468                         niobuf->rnb_flags  = pg->flag;
1469                 }
1470                 pg_prev = pg;
1471         }
1472
1473         LASSERTF((void *)(niobuf - niocount) ==
1474                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1475                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1476                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1477
1478         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1479         if (resend) {
1480                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1481                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1482                         body->oa.o_flags = 0;
1483                 }
1484                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1485         }
1486
1487         if (osc_should_shrink_grant(cli))
1488                 osc_shrink_grant_local(cli, &body->oa);
1489
1490         /* size[REQ_REC_OFF] still sizeof (*body) */
1491         if (opc == OST_WRITE) {
1492                 if (cli->cl_checksum &&
1493                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1494                         /* store cl_cksum_type in a local variable since
1495                          * it can be changed via lprocfs */
1496                         enum cksum_types cksum_type = cli->cl_cksum_type;
1497
1498                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1499                                 body->oa.o_flags = 0;
1500
1501                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1502                                                                 cksum_type);
1503                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1504
1505                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1506                                                   requested_nob, page_count,
1507                                                   pga, OST_WRITE,
1508                                                   &body->oa.o_cksum);
1509                         if (rc < 0) {
1510                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1511                                        rc);
1512                                 GOTO(out, rc);
1513                         }
1514                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1515                                body->oa.o_cksum);
1516
1517                         /* save this in 'oa', too, for later checking */
1518                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1519                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1520                                                            cksum_type);
1521                 } else {
1522                         /* clear out the checksum flag, in case this is a
1523                          * resend but cl_checksum is no longer set. b=11238 */
1524                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1525                 }
1526                 oa->o_cksum = body->oa.o_cksum;
1527                 /* 1 RC per niobuf */
1528                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1529                                      sizeof(__u32) * niocount);
1530         } else {
1531                 if (cli->cl_checksum &&
1532                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1533                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1534                                 body->oa.o_flags = 0;
1535                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1536                                 cli->cl_cksum_type);
1537                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1538                 }
1539
1540                 /* Client cksum has been already copied to wire obdo in previous
1541                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1542                  * resent due to cksum error, this will allow Server to
1543                  * check+dump pages on its side */
1544         }
1545         ptlrpc_request_set_replen(req);
1546
1547         aa = ptlrpc_req_async_args(aa, req);
1548         aa->aa_oa = oa;
1549         aa->aa_requested_nob = requested_nob;
1550         aa->aa_nio_count = niocount;
1551         aa->aa_page_count = page_count;
1552         aa->aa_resends = 0;
1553         aa->aa_ppga = pga;
1554         aa->aa_cli = cli;
1555         INIT_LIST_HEAD(&aa->aa_oaps);
1556
1557         *reqp = req;
1558         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1559         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1560                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1561                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1562         RETURN(0);
1563
1564  out:
1565         ptlrpc_req_finished(req);
1566         RETURN(rc);
1567 }
1568
1569 char dbgcksum_file_name[PATH_MAX];
1570
1571 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1572                                 struct brw_page **pga, __u32 server_cksum,
1573                                 __u32 client_cksum)
1574 {
1575         struct file *filp;
1576         int rc, i;
1577         unsigned int len;
1578         char *buf;
1579
1580         /* will only keep dump of pages on first error for the same range in
1581          * file/fid, not during the resends/retries. */
1582         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1583                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1584                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1585                   libcfs_debug_file_path_arr :
1586                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1587                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1588                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1590                  pga[0]->off,
1591                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1592                  client_cksum, server_cksum);
1593         filp = filp_open(dbgcksum_file_name,
1594                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1595         if (IS_ERR(filp)) {
1596                 rc = PTR_ERR(filp);
1597                 if (rc == -EEXIST)
1598                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1599                                "checksum error: rc = %d\n", dbgcksum_file_name,
1600                                rc);
1601                 else
1602                         CERROR("%s: can't open to dump pages with checksum "
1603                                "error: rc = %d\n", dbgcksum_file_name, rc);
1604                 return;
1605         }
1606
1607         for (i = 0; i < page_count; i++) {
1608                 len = pga[i]->count;
1609                 buf = kmap(pga[i]->pg);
1610                 while (len != 0) {
1611                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1612                         if (rc < 0) {
1613                                 CERROR("%s: wanted to write %u but got %d "
1614                                        "error\n", dbgcksum_file_name, len, rc);
1615                                 break;
1616                         }
1617                         len -= rc;
1618                         buf += rc;
1619                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1620                                dbgcksum_file_name, rc);
1621                 }
1622                 kunmap(pga[i]->pg);
1623         }
1624
1625         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1626         if (rc)
1627                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1628         filp_close(filp, NULL);
1629 }
1630
1631 static int
1632 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1633                      __u32 client_cksum, __u32 server_cksum,
1634                      struct osc_brw_async_args *aa)
1635 {
1636         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1637         enum cksum_types cksum_type;
1638         obd_dif_csum_fn *fn = NULL;
1639         int sector_size = 0;
1640         __u32 new_cksum;
1641         char *msg;
1642         int rc;
1643
1644         if (server_cksum == client_cksum) {
1645                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1646                 return 0;
1647         }
1648
1649         if (aa->aa_cli->cl_checksum_dump)
1650                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1651                                     server_cksum, client_cksum);
1652
1653         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1654                                            oa->o_flags : 0);
1655
1656         switch (cksum_type) {
1657         case OBD_CKSUM_T10IP512:
1658                 fn = obd_dif_ip_fn;
1659                 sector_size = 512;
1660                 break;
1661         case OBD_CKSUM_T10IP4K:
1662                 fn = obd_dif_ip_fn;
1663                 sector_size = 4096;
1664                 break;
1665         case OBD_CKSUM_T10CRC512:
1666                 fn = obd_dif_crc_fn;
1667                 sector_size = 512;
1668                 break;
1669         case OBD_CKSUM_T10CRC4K:
1670                 fn = obd_dif_crc_fn;
1671                 sector_size = 4096;
1672                 break;
1673         default:
1674                 break;
1675         }
1676
1677         if (fn)
1678                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1679                                              aa->aa_page_count, aa->aa_ppga,
1680                                              OST_WRITE, fn, sector_size,
1681                                              &new_cksum);
1682         else
1683                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1684                                        aa->aa_ppga, OST_WRITE, cksum_type,
1685                                        &new_cksum);
1686
1687         if (rc < 0)
1688                 msg = "failed to calculate the client write checksum";
1689         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1690                 msg = "the server did not use the checksum type specified in "
1691                       "the original request - likely a protocol problem";
1692         else if (new_cksum == server_cksum)
1693                 msg = "changed on the client after we checksummed it - "
1694                       "likely false positive due to mmap IO (bug 11742)";
1695         else if (new_cksum == client_cksum)
1696                 msg = "changed in transit before arrival at OST";
1697         else
1698                 msg = "changed in transit AND doesn't match the original - "
1699                       "likely false positive due to mmap IO (bug 11742)";
1700
1701         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1702                            DFID " object "DOSTID" extent [%llu-%llu], original "
1703                            "client csum %x (type %x), server csum %x (type %x),"
1704                            " client csum now %x\n",
1705                            obd_name, msg, libcfs_nid2str(peer->nid),
1706                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1707                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1708                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1709                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1710                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1711                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1712                            client_cksum,
1713                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1714                            server_cksum, cksum_type, new_cksum);
1715         return 1;
1716 }
1717
1718 /* Note rc enters this function as number of bytes transferred */
1719 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1720 {
1721         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1722         struct client_obd *cli = aa->aa_cli;
1723         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1724         const struct lnet_process_id *peer =
1725                 &req->rq_import->imp_connection->c_peer;
1726         struct ost_body *body;
1727         u32 client_cksum = 0;
1728
1729         ENTRY;
1730
1731         if (rc < 0 && rc != -EDQUOT) {
1732                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1733                 RETURN(rc);
1734         }
1735
1736         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1738         if (body == NULL) {
1739                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1740                 RETURN(-EPROTO);
1741         }
1742
1743         /* set/clear over quota flag for a uid/gid/projid */
1744         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1745             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1746                 unsigned qid[LL_MAXQUOTAS] = {
1747                                          body->oa.o_uid, body->oa.o_gid,
1748                                          body->oa.o_projid };
1749                 CDEBUG(D_QUOTA,
1750                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1751                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1752                        body->oa.o_valid, body->oa.o_flags);
1753                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1754                                        body->oa.o_flags);
1755         }
1756
1757         osc_update_grant(cli, body);
1758
1759         if (rc < 0)
1760                 RETURN(rc);
1761
1762         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1763                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1764
1765         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1766                 if (rc > 0) {
1767                         CERROR("%s: unexpected positive size %d\n",
1768                                obd_name, rc);
1769                         RETURN(-EPROTO);
1770                 }
1771
1772                 if (req->rq_bulk != NULL &&
1773                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1774                         RETURN(-EAGAIN);
1775
1776                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1777                     check_write_checksum(&body->oa, peer, client_cksum,
1778                                          body->oa.o_cksum, aa))
1779                         RETURN(-EAGAIN);
1780
1781                 rc = check_write_rcs(req, aa->aa_requested_nob,
1782                                      aa->aa_nio_count, aa->aa_page_count,
1783                                      aa->aa_ppga);
1784                 GOTO(out, rc);
1785         }
1786
1787         /* The rest of this function executes only for OST_READs */
1788
1789         if (req->rq_bulk == NULL) {
1790                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1791                                           RCL_SERVER);
1792                 LASSERT(rc == req->rq_status);
1793         } else {
1794                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1795                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1796         }
1797         if (rc < 0)
1798                 GOTO(out, rc = -EAGAIN);
1799
1800         if (rc > aa->aa_requested_nob) {
1801                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1802                        rc, aa->aa_requested_nob);
1803                 RETURN(-EPROTO);
1804         }
1805
1806         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1807                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1808                        rc, req->rq_bulk->bd_nob_transferred);
1809                 RETURN(-EPROTO);
1810         }
1811
1812         if (req->rq_bulk == NULL) {
1813                 /* short io */
1814                 int nob, pg_count, i = 0;
1815                 unsigned char *buf;
1816
1817                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1818                 pg_count = aa->aa_page_count;
1819                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1820                                                    rc);
1821                 nob = rc;
1822                 while (nob > 0 && pg_count > 0) {
1823                         unsigned char *ptr;
1824                         int count = aa->aa_ppga[i]->count > nob ?
1825                                     nob : aa->aa_ppga[i]->count;
1826
1827                         CDEBUG(D_CACHE, "page %p count %d\n",
1828                                aa->aa_ppga[i]->pg, count);
1829                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1830                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1831                                count);
1832                         kunmap_atomic((void *) ptr);
1833
1834                         buf += count;
1835                         nob -= count;
1836                         i++;
1837                         pg_count--;
1838                 }
1839         }
1840
1841         if (rc < aa->aa_requested_nob)
1842                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1843
1844         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1845                 static int cksum_counter;
1846                 u32        server_cksum = body->oa.o_cksum;
1847                 char      *via = "";
1848                 char      *router = "";
1849                 enum cksum_types cksum_type;
1850                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1851                         body->oa.o_flags : 0;
1852
1853                 cksum_type = obd_cksum_type_unpack(o_flags);
1854                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1855                                           aa->aa_page_count, aa->aa_ppga,
1856                                           OST_READ, &client_cksum);
1857                 if (rc < 0)
1858                         GOTO(out, rc);
1859
1860                 if (req->rq_bulk != NULL &&
1861                     peer->nid != req->rq_bulk->bd_sender) {
1862                         via = " via ";
1863                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1864                 }
1865
1866                 if (server_cksum != client_cksum) {
1867                         struct ost_body *clbody;
1868                         u32 page_count = aa->aa_page_count;
1869
1870                         clbody = req_capsule_client_get(&req->rq_pill,
1871                                                         &RMF_OST_BODY);
1872                         if (cli->cl_checksum_dump)
1873                                 dump_all_bulk_pages(&clbody->oa, page_count,
1874                                                     aa->aa_ppga, server_cksum,
1875                                                     client_cksum);
1876
1877                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1878                                            "%s%s%s inode "DFID" object "DOSTID
1879                                            " extent [%llu-%llu], client %x, "
1880                                            "server %x, cksum_type %x\n",
1881                                            obd_name,
1882                                            libcfs_nid2str(peer->nid),
1883                                            via, router,
1884                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1885                                                 clbody->oa.o_parent_seq : 0ULL,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_oid : 0,
1888                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1889                                                 clbody->oa.o_parent_ver : 0,
1890                                            POSTID(&body->oa.o_oi),
1891                                            aa->aa_ppga[0]->off,
1892                                            aa->aa_ppga[page_count-1]->off +
1893                                            aa->aa_ppga[page_count-1]->count - 1,
1894                                            client_cksum, server_cksum,
1895                                            cksum_type);
1896                         cksum_counter = 0;
1897                         aa->aa_oa->o_cksum = client_cksum;
1898                         rc = -EAGAIN;
1899                 } else {
1900                         cksum_counter++;
1901                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1902                         rc = 0;
1903                 }
1904         } else if (unlikely(client_cksum)) {
1905                 static int cksum_missed;
1906
1907                 cksum_missed++;
1908                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1909                         CERROR("%s: checksum %u requested from %s but not sent\n",
1910                                obd_name, cksum_missed,
1911                                libcfs_nid2str(peer->nid));
1912         } else {
1913                 rc = 0;
1914         }
1915 out:
1916         if (rc >= 0)
1917                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1918                                      aa->aa_oa, &body->oa);
1919
1920         RETURN(rc);
1921 }
1922
1923 static int osc_brw_redo_request(struct ptlrpc_request *request,
1924                                 struct osc_brw_async_args *aa, int rc)
1925 {
1926         struct ptlrpc_request *new_req;
1927         struct osc_brw_async_args *new_aa;
1928         struct osc_async_page *oap;
1929         ENTRY;
1930
1931         /* The below message is checked in replay-ost-single.sh test_8ae*/
1932         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1933                   "redo for recoverable error %d", rc);
1934
1935         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1936                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1937                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1938                                   aa->aa_ppga, &new_req, 1);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1943                 if (oap->oap_request != NULL) {
1944                         LASSERTF(request == oap->oap_request,
1945                                  "request %p != oap_request %p\n",
1946                                  request, oap->oap_request);
1947                 }
1948         }
1949         /*
1950          * New request takes over pga and oaps from old request.
1951          * Note that copying a list_head doesn't work, need to move it...
1952          */
1953         aa->aa_resends++;
1954         new_req->rq_interpret_reply = request->rq_interpret_reply;
1955         new_req->rq_async_args = request->rq_async_args;
1956         new_req->rq_commit_cb = request->rq_commit_cb;
1957         /* cap resend delay to the current request timeout, this is similar to
1958          * what ptlrpc does (see after_reply()) */
1959         if (aa->aa_resends > new_req->rq_timeout)
1960                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1961         else
1962                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1963         new_req->rq_generation_set = 1;
1964         new_req->rq_import_generation = request->rq_import_generation;
1965
1966         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1967
1968         INIT_LIST_HEAD(&new_aa->aa_oaps);
1969         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1970         INIT_LIST_HEAD(&new_aa->aa_exts);
1971         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1972         new_aa->aa_resends = aa->aa_resends;
1973
1974         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1975                 if (oap->oap_request) {
1976                         ptlrpc_req_finished(oap->oap_request);
1977                         oap->oap_request = ptlrpc_request_addref(new_req);
1978                 }
1979         }
1980
1981         /* XXX: This code will run into problem if we're going to support
1982          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1983          * and wait for all of them to be finished. We should inherit request
1984          * set from old request. */
1985         ptlrpcd_add_req(new_req);
1986
1987         DEBUG_REQ(D_INFO, new_req, "new request");
1988         RETURN(0);
1989 }
1990
1991 /*
1992  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1993  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1994  * fine for our small page arrays and doesn't require allocation.  its an
1995  * insertion sort that swaps elements that are strides apart, shrinking the
1996  * stride down until its '1' and the array is sorted.
1997  */
1998 static void sort_brw_pages(struct brw_page **array, int num)
1999 {
2000         int stride, i, j;
2001         struct brw_page *tmp;
2002
2003         if (num == 1)
2004                 return;
2005         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2006                 ;
2007
2008         do {
2009                 stride /= 3;
2010                 for (i = stride ; i < num ; i++) {
2011                         tmp = array[i];
2012                         j = i;
2013                         while (j >= stride && array[j - stride]->off > tmp->off) {
2014                                 array[j] = array[j - stride];
2015                                 j -= stride;
2016                         }
2017                         array[j] = tmp;
2018                 }
2019         } while (stride > 1);
2020 }
2021
2022 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2023 {
2024         LASSERT(ppga != NULL);
2025         OBD_FREE(ppga, sizeof(*ppga) * count);
2026 }
2027
2028 static int brw_interpret(const struct lu_env *env,
2029                          struct ptlrpc_request *req, void *args, int rc)
2030 {
2031         struct osc_brw_async_args *aa = args;
2032         struct osc_extent *ext;
2033         struct osc_extent *tmp;
2034         struct client_obd *cli = aa->aa_cli;
2035         unsigned long transferred = 0;
2036
2037         ENTRY;
2038
2039         rc = osc_brw_fini_request(req, rc);
2040         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2041         /*
2042          * When server returns -EINPROGRESS, client should always retry
2043          * regardless of the number of times the bulk was resent already.
2044          */
2045         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2046                 if (req->rq_import_generation !=
2047                     req->rq_import->imp_generation) {
2048                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2049                                ""DOSTID", rc = %d.\n",
2050                                req->rq_import->imp_obd->obd_name,
2051                                POSTID(&aa->aa_oa->o_oi), rc);
2052                 } else if (rc == -EINPROGRESS ||
2053                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2054                         rc = osc_brw_redo_request(req, aa, rc);
2055                 } else {
2056                         CERROR("%s: too many resent retries for object: "
2057                                "%llu:%llu, rc = %d.\n",
2058                                req->rq_import->imp_obd->obd_name,
2059                                POSTID(&aa->aa_oa->o_oi), rc);
2060                 }
2061
2062                 if (rc == 0)
2063                         RETURN(0);
2064                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2065                         rc = -EIO;
2066         }
2067
2068         if (rc == 0) {
2069                 struct obdo *oa = aa->aa_oa;
2070                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2071                 unsigned long valid = 0;
2072                 struct cl_object *obj;
2073                 struct osc_async_page *last;
2074
2075                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2076                 obj = osc2cl(last->oap_obj);
2077
2078                 cl_object_attr_lock(obj);
2079                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2080                         attr->cat_blocks = oa->o_blocks;
2081                         valid |= CAT_BLOCKS;
2082                 }
2083                 if (oa->o_valid & OBD_MD_FLMTIME) {
2084                         attr->cat_mtime = oa->o_mtime;
2085                         valid |= CAT_MTIME;
2086                 }
2087                 if (oa->o_valid & OBD_MD_FLATIME) {
2088                         attr->cat_atime = oa->o_atime;
2089                         valid |= CAT_ATIME;
2090                 }
2091                 if (oa->o_valid & OBD_MD_FLCTIME) {
2092                         attr->cat_ctime = oa->o_ctime;
2093                         valid |= CAT_CTIME;
2094                 }
2095
2096                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2097                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2098                         loff_t last_off = last->oap_count + last->oap_obj_off +
2099                                 last->oap_page_off;
2100
2101                         /* Change file size if this is an out of quota or
2102                          * direct IO write and it extends the file size */
2103                         if (loi->loi_lvb.lvb_size < last_off) {
2104                                 attr->cat_size = last_off;
2105                                 valid |= CAT_SIZE;
2106                         }
2107                         /* Extend KMS if it's not a lockless write */
2108                         if (loi->loi_kms < last_off &&
2109                             oap2osc_page(last)->ops_srvlock == 0) {
2110                                 attr->cat_kms = last_off;
2111                                 valid |= CAT_KMS;
2112                         }
2113                 }
2114
2115                 if (valid != 0)
2116                         cl_object_attr_update(env, obj, attr, valid);
2117                 cl_object_attr_unlock(obj);
2118         }
2119         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2120         aa->aa_oa = NULL;
2121
2122         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2123                 osc_inc_unstable_pages(req);
2124
2125         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2126                 list_del_init(&ext->oe_link);
2127                 osc_extent_finish(env, ext, 1,
2128                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2129         }
2130         LASSERT(list_empty(&aa->aa_exts));
2131         LASSERT(list_empty(&aa->aa_oaps));
2132
2133         transferred = (req->rq_bulk == NULL ? /* short io */
2134                        aa->aa_requested_nob :
2135                        req->rq_bulk->bd_nob_transferred);
2136
2137         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2138         ptlrpc_lprocfs_brw(req, transferred);
2139
2140         spin_lock(&cli->cl_loi_list_lock);
2141         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2142          * is called so we know whether to go to sync BRWs or wait for more
2143          * RPCs to complete */
2144         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2145                 cli->cl_w_in_flight--;
2146         else
2147                 cli->cl_r_in_flight--;
2148         osc_wake_cache_waiters(cli);
2149         spin_unlock(&cli->cl_loi_list_lock);
2150
2151         osc_io_unplug(env, cli, NULL);
2152         RETURN(rc);
2153 }
2154
2155 static void brw_commit(struct ptlrpc_request *req)
2156 {
2157         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2158          * this called via the rq_commit_cb, I need to ensure
2159          * osc_dec_unstable_pages is still called. Otherwise unstable
2160          * pages may be leaked. */
2161         spin_lock(&req->rq_lock);
2162         if (likely(req->rq_unstable)) {
2163                 req->rq_unstable = 0;
2164                 spin_unlock(&req->rq_lock);
2165
2166                 osc_dec_unstable_pages(req);
2167         } else {
2168                 req->rq_committed = 1;
2169                 spin_unlock(&req->rq_lock);
2170         }
2171 }
2172
2173 /**
2174  * Build an RPC by the list of extent @ext_list. The caller must ensure
2175  * that the total pages in this list are NOT over max pages per RPC.
2176  * Extents in the list must be in OES_RPC state.
2177  */
2178 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2179                   struct list_head *ext_list, int cmd)
2180 {
2181         struct ptlrpc_request           *req = NULL;
2182         struct osc_extent               *ext;
2183         struct brw_page                 **pga = NULL;
2184         struct osc_brw_async_args       *aa = NULL;
2185         struct obdo                     *oa = NULL;
2186         struct osc_async_page           *oap;
2187         struct osc_object               *obj = NULL;
2188         struct cl_req_attr              *crattr = NULL;
2189         loff_t                          starting_offset = OBD_OBJECT_EOF;
2190         loff_t                          ending_offset = 0;
2191         int                             mpflag = 0;
2192         int                             mem_tight = 0;
2193         int                             page_count = 0;
2194         bool                            soft_sync = false;
2195         bool                            ndelay = false;
2196         int                             i;
2197         int                             grant = 0;
2198         int                             rc;
2199         __u32                           layout_version = 0;
2200         LIST_HEAD(rpc_list);
2201         struct ost_body                 *body;
2202         ENTRY;
2203         LASSERT(!list_empty(ext_list));
2204
2205         /* add pages into rpc_list to build BRW rpc */
2206         list_for_each_entry(ext, ext_list, oe_link) {
2207                 LASSERT(ext->oe_state == OES_RPC);
2208                 mem_tight |= ext->oe_memalloc;
2209                 grant += ext->oe_grants;
2210                 page_count += ext->oe_nr_pages;
2211                 layout_version = max(layout_version, ext->oe_layout_version);
2212                 if (obj == NULL)
2213                         obj = ext->oe_obj;
2214         }
2215
2216         soft_sync = osc_over_unstable_soft_limit(cli);
2217         if (mem_tight)
2218                 mpflag = cfs_memory_pressure_get_and_set();
2219
2220         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2221         if (pga == NULL)
2222                 GOTO(out, rc = -ENOMEM);
2223
2224         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2225         if (oa == NULL)
2226                 GOTO(out, rc = -ENOMEM);
2227
2228         i = 0;
2229         list_for_each_entry(ext, ext_list, oe_link) {
2230                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2231                         if (mem_tight)
2232                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2233                         if (soft_sync)
2234                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2235                         pga[i] = &oap->oap_brw_page;
2236                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2237                         i++;
2238
2239                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2240                         if (starting_offset == OBD_OBJECT_EOF ||
2241                             starting_offset > oap->oap_obj_off)
2242                                 starting_offset = oap->oap_obj_off;
2243                         else
2244                                 LASSERT(oap->oap_page_off == 0);
2245                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2246                                 ending_offset = oap->oap_obj_off +
2247                                                 oap->oap_count;
2248                         else
2249                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2250                                         PAGE_SIZE);
2251                 }
2252                 if (ext->oe_ndelay)
2253                         ndelay = true;
2254         }
2255
2256         /* first page in the list */
2257         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2258
2259         crattr = &osc_env_info(env)->oti_req_attr;
2260         memset(crattr, 0, sizeof(*crattr));
2261         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2262         crattr->cra_flags = ~0ULL;
2263         crattr->cra_page = oap2cl_page(oap);
2264         crattr->cra_oa = oa;
2265         cl_req_attr_set(env, osc2cl(obj), crattr);
2266
2267         if (cmd == OBD_BRW_WRITE) {
2268                 oa->o_grant_used = grant;
2269                 if (layout_version > 0) {
2270                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2271                                PFID(&oa->o_oi.oi_fid), layout_version);
2272
2273                         oa->o_layout_version = layout_version;
2274                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2275                 }
2276         }
2277
2278         sort_brw_pages(pga, page_count);
2279         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2280         if (rc != 0) {
2281                 CERROR("prep_req failed: %d\n", rc);
2282                 GOTO(out, rc);
2283         }
2284
2285         req->rq_commit_cb = brw_commit;
2286         req->rq_interpret_reply = brw_interpret;
2287         req->rq_memalloc = mem_tight != 0;
2288         oap->oap_request = ptlrpc_request_addref(req);
2289         if (ndelay) {
2290                 req->rq_no_resend = req->rq_no_delay = 1;
2291                 /* probably set a shorter timeout value.
2292                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2293                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2294         }
2295
2296         /* Need to update the timestamps after the request is built in case
2297          * we race with setattr (locally or in queue at OST).  If OST gets
2298          * later setattr before earlier BRW (as determined by the request xid),
2299          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2300          * way to do this in a single call.  bug 10150 */
2301         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2302         crattr->cra_oa = &body->oa;
2303         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2304         cl_req_attr_set(env, osc2cl(obj), crattr);
2305         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2306
2307         aa = ptlrpc_req_async_args(aa, req);
2308         INIT_LIST_HEAD(&aa->aa_oaps);
2309         list_splice_init(&rpc_list, &aa->aa_oaps);
2310         INIT_LIST_HEAD(&aa->aa_exts);
2311         list_splice_init(ext_list, &aa->aa_exts);
2312
2313         spin_lock(&cli->cl_loi_list_lock);
2314         starting_offset >>= PAGE_SHIFT;
2315         if (cmd == OBD_BRW_READ) {
2316                 cli->cl_r_in_flight++;
2317                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2318                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2319                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2320                                       starting_offset + 1);
2321         } else {
2322                 cli->cl_w_in_flight++;
2323                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2324                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2325                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2326                                       starting_offset + 1);
2327         }
2328         spin_unlock(&cli->cl_loi_list_lock);
2329
2330         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2331                   page_count, aa, cli->cl_r_in_flight,
2332                   cli->cl_w_in_flight);
2333         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2334
2335         ptlrpcd_add_req(req);
2336         rc = 0;
2337         EXIT;
2338
2339 out:
2340         if (mem_tight != 0)
2341                 cfs_memory_pressure_restore(mpflag);
2342
2343         if (rc != 0) {
2344                 LASSERT(req == NULL);
2345
2346                 if (oa)
2347                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2348                 if (pga)
2349                         OBD_FREE(pga, sizeof(*pga) * page_count);
2350                 /* this should happen rarely and is pretty bad, it makes the
2351                  * pending list not follow the dirty order */
2352                 while (!list_empty(ext_list)) {
2353                         ext = list_entry(ext_list->next, struct osc_extent,
2354                                          oe_link);
2355                         list_del_init(&ext->oe_link);
2356                         osc_extent_finish(env, ext, 0, rc);
2357                 }
2358         }
2359         RETURN(rc);
2360 }
2361
2362 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2363 {
2364         int set = 0;
2365
2366         LASSERT(lock != NULL);
2367
2368         lock_res_and_lock(lock);
2369
2370         if (lock->l_ast_data == NULL)
2371                 lock->l_ast_data = data;
2372         if (lock->l_ast_data == data)
2373                 set = 1;
2374
2375         unlock_res_and_lock(lock);
2376
2377         return set;
2378 }
2379
2380 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2381                      void *cookie, struct lustre_handle *lockh,
2382                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2383                      int errcode)
2384 {
2385         bool intent = *flags & LDLM_FL_HAS_INTENT;
2386         int rc;
2387         ENTRY;
2388
2389         /* The request was created before ldlm_cli_enqueue call. */
2390         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2391                 struct ldlm_reply *rep;
2392
2393                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2394                 LASSERT(rep != NULL);
2395
2396                 rep->lock_policy_res1 =
2397                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2398                 if (rep->lock_policy_res1)
2399                         errcode = rep->lock_policy_res1;
2400                 if (!speculative)
2401                         *flags |= LDLM_FL_LVB_READY;
2402         } else if (errcode == ELDLM_OK) {
2403                 *flags |= LDLM_FL_LVB_READY;
2404         }
2405
2406         /* Call the update callback. */
2407         rc = (*upcall)(cookie, lockh, errcode);
2408
2409         /* release the reference taken in ldlm_cli_enqueue() */
2410         if (errcode == ELDLM_LOCK_MATCHED)
2411                 errcode = ELDLM_OK;
2412         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2413                 ldlm_lock_decref(lockh, mode);
2414
2415         RETURN(rc);
2416 }
2417
2418 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2419                           void *args, int rc)
2420 {
2421         struct osc_enqueue_args *aa = args;
2422         struct ldlm_lock *lock;
2423         struct lustre_handle *lockh = &aa->oa_lockh;
2424         enum ldlm_mode mode = aa->oa_mode;
2425         struct ost_lvb *lvb = aa->oa_lvb;
2426         __u32 lvb_len = sizeof(*lvb);
2427         __u64 flags = 0;
2428
2429         ENTRY;
2430
2431         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2432          * be valid. */
2433         lock = ldlm_handle2lock(lockh);
2434         LASSERTF(lock != NULL,
2435                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2436                  lockh->cookie, req, aa);
2437
2438         /* Take an additional reference so that a blocking AST that
2439          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2440          * to arrive after an upcall has been executed by
2441          * osc_enqueue_fini(). */
2442         ldlm_lock_addref(lockh, mode);
2443
2444         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2445         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2446
2447         /* Let CP AST to grant the lock first. */
2448         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2449
2450         if (aa->oa_speculative) {
2451                 LASSERT(aa->oa_lvb == NULL);
2452                 LASSERT(aa->oa_flags == NULL);
2453                 aa->oa_flags = &flags;
2454         }
2455
2456         /* Complete obtaining the lock procedure. */
2457         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2458                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2459                                    lockh, rc);
2460         /* Complete osc stuff. */
2461         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2462                               aa->oa_flags, aa->oa_speculative, rc);
2463
2464         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2465
2466         ldlm_lock_decref(lockh, mode);
2467         LDLM_LOCK_PUT(lock);
2468         RETURN(rc);
2469 }
2470
2471 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2472
2473 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2474  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2475  * other synchronous requests, however keeping some locks and trying to obtain
2476  * others may take a considerable amount of time in a case of ost failure; and
2477  * when other sync requests do not get released lock from a client, the client
2478  * is evicted from the cluster -- such scenarious make the life difficult, so
2479  * release locks just after they are obtained. */
2480 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2481                      __u64 *flags, union ldlm_policy_data *policy,
2482                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2483                      void *cookie, struct ldlm_enqueue_info *einfo,
2484                      struct ptlrpc_request_set *rqset, int async,
2485                      bool speculative)
2486 {
2487         struct obd_device *obd = exp->exp_obd;
2488         struct lustre_handle lockh = { 0 };
2489         struct ptlrpc_request *req = NULL;
2490         int intent = *flags & LDLM_FL_HAS_INTENT;
2491         __u64 match_flags = *flags;
2492         enum ldlm_mode mode;
2493         int rc;
2494         ENTRY;
2495
2496         /* Filesystem lock extents are extended to page boundaries so that
2497          * dealing with the page cache is a little smoother.  */
2498         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2499         policy->l_extent.end |= ~PAGE_MASK;
2500
2501         /* Next, search for already existing extent locks that will cover us */
2502         /* If we're trying to read, we also search for an existing PW lock.  The
2503          * VFS and page cache already protect us locally, so lots of readers/
2504          * writers can share a single PW lock.
2505          *
2506          * There are problems with conversion deadlocks, so instead of
2507          * converting a read lock to a write lock, we'll just enqueue a new
2508          * one.
2509          *
2510          * At some point we should cancel the read lock instead of making them
2511          * send us a blocking callback, but there are problems with canceling
2512          * locks out from other users right now, too. */
2513         mode = einfo->ei_mode;
2514         if (einfo->ei_mode == LCK_PR)
2515                 mode |= LCK_PW;
2516         /* Normal lock requests must wait for the LVB to be ready before
2517          * matching a lock; speculative lock requests do not need to,
2518          * because they will not actually use the lock. */
2519         if (!speculative)
2520                 match_flags |= LDLM_FL_LVB_READY;
2521         if (intent != 0)
2522                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2523         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2524                                einfo->ei_type, policy, mode, &lockh, 0);
2525         if (mode) {
2526                 struct ldlm_lock *matched;
2527
2528                 if (*flags & LDLM_FL_TEST_LOCK)
2529                         RETURN(ELDLM_OK);
2530
2531                 matched = ldlm_handle2lock(&lockh);
2532                 if (speculative) {
2533                         /* This DLM lock request is speculative, and does not
2534                          * have an associated IO request. Therefore if there
2535                          * is already a DLM lock, it wll just inform the
2536                          * caller to cancel the request for this stripe.*/
2537                         lock_res_and_lock(matched);
2538                         if (ldlm_extent_equal(&policy->l_extent,
2539                             &matched->l_policy_data.l_extent))
2540                                 rc = -EEXIST;
2541                         else
2542                                 rc = -ECANCELED;
2543                         unlock_res_and_lock(matched);
2544
2545                         ldlm_lock_decref(&lockh, mode);
2546                         LDLM_LOCK_PUT(matched);
2547                         RETURN(rc);
2548                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2549                         *flags |= LDLM_FL_LVB_READY;
2550
2551                         /* We already have a lock, and it's referenced. */
2552                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2553
2554                         ldlm_lock_decref(&lockh, mode);
2555                         LDLM_LOCK_PUT(matched);
2556                         RETURN(ELDLM_OK);
2557                 } else {
2558                         ldlm_lock_decref(&lockh, mode);
2559                         LDLM_LOCK_PUT(matched);
2560                 }
2561         }
2562
2563         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2564                 RETURN(-ENOLCK);
2565
2566         if (intent) {
2567                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2568                                            &RQF_LDLM_ENQUEUE_LVB);
2569                 if (req == NULL)
2570                         RETURN(-ENOMEM);
2571
2572                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2573                 if (rc) {
2574                         ptlrpc_request_free(req);
2575                         RETURN(rc);
2576                 }
2577
2578                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2579                                      sizeof *lvb);
2580                 ptlrpc_request_set_replen(req);
2581         }
2582
2583         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2584         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2585
2586         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2587                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2588         if (async) {
2589                 if (!rc) {
2590                         struct osc_enqueue_args *aa;
2591                         aa = ptlrpc_req_async_args(aa, req);
2592                         aa->oa_exp         = exp;
2593                         aa->oa_mode        = einfo->ei_mode;
2594                         aa->oa_type        = einfo->ei_type;
2595                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2596                         aa->oa_upcall      = upcall;
2597                         aa->oa_cookie      = cookie;
2598                         aa->oa_speculative = speculative;
2599                         if (!speculative) {
2600                                 aa->oa_flags  = flags;
2601                                 aa->oa_lvb    = lvb;
2602                         } else {
2603                                 /* speculative locks are essentially to enqueue
2604                                  * a DLM lock  in advance, so we don't care
2605                                  * about the result of the enqueue. */
2606                                 aa->oa_lvb    = NULL;
2607                                 aa->oa_flags  = NULL;
2608                         }
2609
2610                         req->rq_interpret_reply = osc_enqueue_interpret;
2611                         if (rqset == PTLRPCD_SET)
2612                                 ptlrpcd_add_req(req);
2613                         else
2614                                 ptlrpc_set_add_req(rqset, req);
2615                 } else if (intent) {
2616                         ptlrpc_req_finished(req);
2617                 }
2618                 RETURN(rc);
2619         }
2620
2621         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2622                               flags, speculative, rc);
2623         if (intent)
2624                 ptlrpc_req_finished(req);
2625
2626         RETURN(rc);
2627 }
2628
2629 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2630                    struct ldlm_res_id *res_id, enum ldlm_type type,
2631                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2632                    __u64 *flags, struct osc_object *obj,
2633                    struct lustre_handle *lockh, int unref)
2634 {
2635         struct obd_device *obd = exp->exp_obd;
2636         __u64 lflags = *flags;
2637         enum ldlm_mode rc;
2638         ENTRY;
2639
2640         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2641                 RETURN(-EIO);
2642
2643         /* Filesystem lock extents are extended to page boundaries so that
2644          * dealing with the page cache is a little smoother */
2645         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2646         policy->l_extent.end |= ~PAGE_MASK;
2647
2648         /* Next, search for already existing extent locks that will cover us */
2649         /* If we're trying to read, we also search for an existing PW lock.  The
2650          * VFS and page cache already protect us locally, so lots of readers/
2651          * writers can share a single PW lock. */
2652         rc = mode;
2653         if (mode == LCK_PR)
2654                 rc |= LCK_PW;
2655         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2656                              res_id, type, policy, rc, lockh, unref);
2657         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2658                 RETURN(rc);
2659
2660         if (obj != NULL) {
2661                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2662
2663                 LASSERT(lock != NULL);
2664                 if (osc_set_lock_data(lock, obj)) {
2665                         lock_res_and_lock(lock);
2666                         if (!ldlm_is_lvb_cached(lock)) {
2667                                 LASSERT(lock->l_ast_data == obj);
2668                                 osc_lock_lvb_update(env, obj, lock, NULL);
2669                                 ldlm_set_lvb_cached(lock);
2670                         }
2671                         unlock_res_and_lock(lock);
2672                 } else {
2673                         ldlm_lock_decref(lockh, rc);
2674                         rc = 0;
2675                 }
2676                 LDLM_LOCK_PUT(lock);
2677         }
2678         RETURN(rc);
2679 }
2680
2681 static int osc_statfs_interpret(const struct lu_env *env,
2682                                 struct ptlrpc_request *req, void *args, int rc)
2683 {
2684         struct osc_async_args *aa = args;
2685         struct obd_statfs *msfs;
2686
2687         ENTRY;
2688         if (rc == -EBADR)
2689                 /*
2690                  * The request has in fact never been sent due to issues at
2691                  * a higher level (LOV).  Exit immediately since the caller
2692                  * is aware of the problem and takes care of the clean up.
2693                  */
2694                 RETURN(rc);
2695
2696         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2697             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2698                 GOTO(out, rc = 0);
2699
2700         if (rc != 0)
2701                 GOTO(out, rc);
2702
2703         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2704         if (msfs == NULL)
2705                 GOTO(out, rc = -EPROTO);
2706
2707         *aa->aa_oi->oi_osfs = *msfs;
2708 out:
2709         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2710
2711         RETURN(rc);
2712 }
2713
2714 static int osc_statfs_async(struct obd_export *exp,
2715                             struct obd_info *oinfo, time64_t max_age,
2716                             struct ptlrpc_request_set *rqset)
2717 {
2718         struct obd_device     *obd = class_exp2obd(exp);
2719         struct ptlrpc_request *req;
2720         struct osc_async_args *aa;
2721         int rc;
2722         ENTRY;
2723
2724         if (obd->obd_osfs_age >= max_age) {
2725                 CDEBUG(D_SUPER,
2726                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2727                        obd->obd_name, &obd->obd_osfs,
2728                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2729                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2730                 spin_lock(&obd->obd_osfs_lock);
2731                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2732                 spin_unlock(&obd->obd_osfs_lock);
2733                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2734                 if (oinfo->oi_cb_up)
2735                         oinfo->oi_cb_up(oinfo, 0);
2736
2737                 RETURN(0);
2738         }
2739
2740         /* We could possibly pass max_age in the request (as an absolute
2741          * timestamp or a "seconds.usec ago") so the target can avoid doing
2742          * extra calls into the filesystem if that isn't necessary (e.g.
2743          * during mount that would help a bit).  Having relative timestamps
2744          * is not so great if request processing is slow, while absolute
2745          * timestamps are not ideal because they need time synchronization. */
2746         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2747         if (req == NULL)
2748                 RETURN(-ENOMEM);
2749
2750         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2751         if (rc) {
2752                 ptlrpc_request_free(req);
2753                 RETURN(rc);
2754         }
2755         ptlrpc_request_set_replen(req);
2756         req->rq_request_portal = OST_CREATE_PORTAL;
2757         ptlrpc_at_set_req_timeout(req);
2758
2759         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2760                 /* procfs requests not want stat in wait for avoid deadlock */
2761                 req->rq_no_resend = 1;
2762                 req->rq_no_delay = 1;
2763         }
2764
2765         req->rq_interpret_reply = osc_statfs_interpret;
2766         aa = ptlrpc_req_async_args(aa, req);
2767         aa->aa_oi = oinfo;
2768
2769         ptlrpc_set_add_req(rqset, req);
2770         RETURN(0);
2771 }
2772
2773 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2774                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2775 {
2776         struct obd_device     *obd = class_exp2obd(exp);
2777         struct obd_statfs     *msfs;
2778         struct ptlrpc_request *req;
2779         struct obd_import     *imp = NULL;
2780         int rc;
2781         ENTRY;
2782
2783
2784         /*Since the request might also come from lprocfs, so we need
2785          *sync this with client_disconnect_export Bug15684*/
2786         down_read(&obd->u.cli.cl_sem);
2787         if (obd->u.cli.cl_import)
2788                 imp = class_import_get(obd->u.cli.cl_import);
2789         up_read(&obd->u.cli.cl_sem);
2790         if (!imp)
2791                 RETURN(-ENODEV);
2792
2793         /* We could possibly pass max_age in the request (as an absolute
2794          * timestamp or a "seconds.usec ago") so the target can avoid doing
2795          * extra calls into the filesystem if that isn't necessary (e.g.
2796          * during mount that would help a bit).  Having relative timestamps
2797          * is not so great if request processing is slow, while absolute
2798          * timestamps are not ideal because they need time synchronization. */
2799         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2800
2801         class_import_put(imp);
2802
2803         if (req == NULL)
2804                 RETURN(-ENOMEM);
2805
2806         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2807         if (rc) {
2808                 ptlrpc_request_free(req);
2809                 RETURN(rc);
2810         }
2811         ptlrpc_request_set_replen(req);
2812         req->rq_request_portal = OST_CREATE_PORTAL;
2813         ptlrpc_at_set_req_timeout(req);
2814
2815         if (flags & OBD_STATFS_NODELAY) {
2816                 /* procfs requests not want stat in wait for avoid deadlock */
2817                 req->rq_no_resend = 1;
2818                 req->rq_no_delay = 1;
2819         }
2820
2821         rc = ptlrpc_queue_wait(req);
2822         if (rc)
2823                 GOTO(out, rc);
2824
2825         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2826         if (msfs == NULL)
2827                 GOTO(out, rc = -EPROTO);
2828
2829         *osfs = *msfs;
2830
2831         EXIT;
2832 out:
2833         ptlrpc_req_finished(req);
2834         return rc;
2835 }
2836
2837 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2838                          void *karg, void __user *uarg)
2839 {
2840         struct obd_device *obd = exp->exp_obd;
2841         struct obd_ioctl_data *data = karg;
2842         int rc = 0;
2843
2844         ENTRY;
2845         if (!try_module_get(THIS_MODULE)) {
2846                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2847                        module_name(THIS_MODULE));
2848                 return -EINVAL;
2849         }
2850         switch (cmd) {
2851         case OBD_IOC_CLIENT_RECOVER:
2852                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2853                                            data->ioc_inlbuf1, 0);
2854                 if (rc > 0)
2855                         rc = 0;
2856                 break;
2857         case IOC_OSC_SET_ACTIVE:
2858                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2859                                               data->ioc_offset);
2860                 break;
2861         default:
2862                 rc = -ENOTTY;
2863                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2864                        obd->obd_name, cmd, current_comm(), rc);
2865                 break;
2866         }
2867
2868         module_put(THIS_MODULE);
2869         return rc;
2870 }
2871
2872 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2873                        u32 keylen, void *key, u32 vallen, void *val,
2874                        struct ptlrpc_request_set *set)
2875 {
2876         struct ptlrpc_request *req;
2877         struct obd_device     *obd = exp->exp_obd;
2878         struct obd_import     *imp = class_exp2cliimp(exp);
2879         char                  *tmp;
2880         int                    rc;
2881         ENTRY;
2882
2883         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2884
2885         if (KEY_IS(KEY_CHECKSUM)) {
2886                 if (vallen != sizeof(int))
2887                         RETURN(-EINVAL);
2888                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2889                 RETURN(0);
2890         }
2891
2892         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2893                 sptlrpc_conf_client_adapt(obd);
2894                 RETURN(0);
2895         }
2896
2897         if (KEY_IS(KEY_FLUSH_CTX)) {
2898                 sptlrpc_import_flush_my_ctx(imp);
2899                 RETURN(0);
2900         }
2901
2902         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2903                 struct client_obd *cli = &obd->u.cli;
2904                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2905                 long target = *(long *)val;
2906
2907                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2908                 *(long *)val -= nr;
2909                 RETURN(0);
2910         }
2911
2912         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2913                 RETURN(-EINVAL);
2914
2915         /* We pass all other commands directly to OST. Since nobody calls osc
2916            methods directly and everybody is supposed to go through LOV, we
2917            assume lov checked invalid values for us.
2918            The only recognised values so far are evict_by_nid and mds_conn.
2919            Even if something bad goes through, we'd get a -EINVAL from OST
2920            anyway. */
2921
2922         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2923                                                 &RQF_OST_SET_GRANT_INFO :
2924                                                 &RQF_OBD_SET_INFO);
2925         if (req == NULL)
2926                 RETURN(-ENOMEM);
2927
2928         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2929                              RCL_CLIENT, keylen);
2930         if (!KEY_IS(KEY_GRANT_SHRINK))
2931                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2932                                      RCL_CLIENT, vallen);
2933         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2934         if (rc) {
2935                 ptlrpc_request_free(req);
2936                 RETURN(rc);
2937         }
2938
2939         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2940         memcpy(tmp, key, keylen);
2941         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2942                                                         &RMF_OST_BODY :
2943                                                         &RMF_SETINFO_VAL);
2944         memcpy(tmp, val, vallen);
2945
2946         if (KEY_IS(KEY_GRANT_SHRINK)) {
2947                 struct osc_grant_args *aa;
2948                 struct obdo *oa;
2949
2950                 aa = ptlrpc_req_async_args(aa, req);
2951                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2952                 if (!oa) {
2953                         ptlrpc_req_finished(req);
2954                         RETURN(-ENOMEM);
2955                 }
2956                 *oa = ((struct ost_body *)val)->oa;
2957                 aa->aa_oa = oa;
2958                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2959         }
2960
2961         ptlrpc_request_set_replen(req);
2962         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2963                 LASSERT(set != NULL);
2964                 ptlrpc_set_add_req(set, req);
2965                 ptlrpc_check_set(NULL, set);
2966         } else {
2967                 ptlrpcd_add_req(req);
2968         }
2969
2970         RETURN(0);
2971 }
2972 EXPORT_SYMBOL(osc_set_info_async);
2973
2974 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2975                   struct obd_device *obd, struct obd_uuid *cluuid,
2976                   struct obd_connect_data *data, void *localdata)
2977 {
2978         struct client_obd *cli = &obd->u.cli;
2979
2980         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2981                 long lost_grant;
2982                 long grant;
2983
2984                 spin_lock(&cli->cl_loi_list_lock);
2985                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2986                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2987                         /* restore ocd_grant_blkbits as client page bits */
2988                         data->ocd_grant_blkbits = PAGE_SHIFT;
2989                         grant += cli->cl_dirty_grant;
2990                 } else {
2991                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2992                 }
2993                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2994                 lost_grant = cli->cl_lost_grant;
2995                 cli->cl_lost_grant = 0;
2996                 spin_unlock(&cli->cl_loi_list_lock);
2997
2998                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2999                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3000                        data->ocd_version, data->ocd_grant, lost_grant);
3001         }
3002
3003         RETURN(0);
3004 }
3005 EXPORT_SYMBOL(osc_reconnect);
3006
3007 int osc_disconnect(struct obd_export *exp)
3008 {
3009         struct obd_device *obd = class_exp2obd(exp);
3010         int rc;
3011
3012         rc = client_disconnect_export(exp);
3013         /**
3014          * Initially we put del_shrink_grant before disconnect_export, but it
3015          * causes the following problem if setup (connect) and cleanup
3016          * (disconnect) are tangled together.
3017          *      connect p1                     disconnect p2
3018          *   ptlrpc_connect_import
3019          *     ...............               class_manual_cleanup
3020          *                                     osc_disconnect
3021          *                                     del_shrink_grant
3022          *   ptlrpc_connect_interrupt
3023          *     osc_init_grant
3024          *   add this client to shrink list
3025          *                                      cleanup_osc
3026          * Bang! grant shrink thread trigger the shrink. BUG18662
3027          */
3028         osc_del_grant_list(&obd->u.cli);
3029         return rc;
3030 }
3031 EXPORT_SYMBOL(osc_disconnect);
3032
3033 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3034                                  struct hlist_node *hnode, void *arg)
3035 {
3036         struct lu_env *env = arg;
3037         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3038         struct ldlm_lock *lock;
3039         struct osc_object *osc = NULL;
3040         ENTRY;
3041
3042         lock_res(res);
3043         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3044                 if (lock->l_ast_data != NULL && osc == NULL) {
3045                         osc = lock->l_ast_data;
3046                         cl_object_get(osc2cl(osc));
3047                 }
3048
3049                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3050                  * by the 2nd round of ldlm_namespace_clean() call in
3051                  * osc_import_event(). */
3052                 ldlm_clear_cleaned(lock);
3053         }
3054         unlock_res(res);
3055
3056         if (osc != NULL) {
3057                 osc_object_invalidate(env, osc);
3058                 cl_object_put(env, osc2cl(osc));
3059         }
3060
3061         RETURN(0);
3062 }
3063 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3064
3065 static int osc_import_event(struct obd_device *obd,
3066                             struct obd_import *imp,
3067                             enum obd_import_event event)
3068 {
3069         struct client_obd *cli;
3070         int rc = 0;
3071
3072         ENTRY;
3073         LASSERT(imp->imp_obd == obd);
3074
3075         switch (event) {
3076         case IMP_EVENT_DISCON: {
3077                 cli = &obd->u.cli;
3078                 spin_lock(&cli->cl_loi_list_lock);
3079                 cli->cl_avail_grant = 0;
3080                 cli->cl_lost_grant = 0;
3081                 spin_unlock(&cli->cl_loi_list_lock);
3082                 break;
3083         }
3084         case IMP_EVENT_INACTIVE: {
3085                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3086                 break;
3087         }
3088         case IMP_EVENT_INVALIDATE: {
3089                 struct ldlm_namespace *ns = obd->obd_namespace;
3090                 struct lu_env         *env;
3091                 __u16                  refcheck;
3092
3093                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3094
3095                 env = cl_env_get(&refcheck);
3096                 if (!IS_ERR(env)) {
3097                         osc_io_unplug(env, &obd->u.cli, NULL);
3098
3099                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3100                                                  osc_ldlm_resource_invalidate,
3101                                                  env, 0);
3102                         cl_env_put(env, &refcheck);
3103
3104                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3105                 } else
3106                         rc = PTR_ERR(env);
3107                 break;
3108         }
3109         case IMP_EVENT_ACTIVE: {
3110                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3111                 break;
3112         }
3113         case IMP_EVENT_OCD: {
3114                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3115
3116                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3117                         osc_init_grant(&obd->u.cli, ocd);
3118
3119                 /* See bug 7198 */
3120                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3121                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3122
3123                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3124                 break;
3125         }
3126         case IMP_EVENT_DEACTIVATE: {
3127                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3128                 break;
3129         }
3130         case IMP_EVENT_ACTIVATE: {
3131                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3132                 break;
3133         }
3134         default:
3135                 CERROR("Unknown import event %d\n", event);
3136                 LBUG();
3137         }
3138         RETURN(rc);
3139 }
3140
3141 /**
3142  * Determine whether the lock can be canceled before replaying the lock
3143  * during recovery, see bug16774 for detailed information.
3144  *
3145  * \retval zero the lock can't be canceled
3146  * \retval other ok to cancel
3147  */
3148 static int osc_cancel_weight(struct ldlm_lock *lock)
3149 {
3150         /*
3151          * Cancel all unused and granted extent lock.
3152          */
3153         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3154             ldlm_is_granted(lock) &&
3155             osc_ldlm_weigh_ast(lock) == 0)
3156                 RETURN(1);
3157
3158         RETURN(0);
3159 }
3160
3161 static int brw_queue_work(const struct lu_env *env, void *data)
3162 {
3163         struct client_obd *cli = data;
3164
3165         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3166
3167         osc_io_unplug(env, cli, NULL);
3168         RETURN(0);
3169 }
3170
3171 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3172 {
3173         struct client_obd *cli = &obd->u.cli;
3174         void *handler;
3175         int rc;
3176
3177         ENTRY;
3178
3179         rc = ptlrpcd_addref();
3180         if (rc)
3181                 RETURN(rc);
3182
3183         rc = client_obd_setup(obd, lcfg);
3184         if (rc)
3185                 GOTO(out_ptlrpcd, rc);
3186
3187
3188         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3189         if (IS_ERR(handler))
3190                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3191         cli->cl_writeback_work = handler;
3192
3193         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3194         if (IS_ERR(handler))
3195                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3196         cli->cl_lru_work = handler;
3197
3198         rc = osc_quota_setup(obd);
3199         if (rc)
3200                 GOTO(out_ptlrpcd_work, rc);
3201
3202         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3203         osc_update_next_shrink(cli);
3204
3205         RETURN(rc);
3206
3207 out_ptlrpcd_work:
3208         if (cli->cl_writeback_work != NULL) {
3209                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3210                 cli->cl_writeback_work = NULL;
3211         }
3212         if (cli->cl_lru_work != NULL) {
3213                 ptlrpcd_destroy_work(cli->cl_lru_work);
3214                 cli->cl_lru_work = NULL;
3215         }
3216         client_obd_cleanup(obd);
3217 out_ptlrpcd:
3218         ptlrpcd_decref();
3219         RETURN(rc);
3220 }
3221 EXPORT_SYMBOL(osc_setup_common);
3222
3223 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3224 {
3225         struct client_obd *cli = &obd->u.cli;
3226         int                adding;
3227         int                added;
3228         int                req_count;
3229         int                rc;
3230
3231         ENTRY;
3232
3233         rc = osc_setup_common(obd, lcfg);
3234         if (rc < 0)
3235                 RETURN(rc);
3236
3237         rc = osc_tunables_init(obd);
3238         if (rc)
3239                 RETURN(rc);
3240
3241         /*
3242          * We try to control the total number of requests with a upper limit
3243          * osc_reqpool_maxreqcount. There might be some race which will cause
3244          * over-limit allocation, but it is fine.
3245          */
3246         req_count = atomic_read(&osc_pool_req_count);
3247         if (req_count < osc_reqpool_maxreqcount) {
3248                 adding = cli->cl_max_rpcs_in_flight + 2;
3249                 if (req_count + adding > osc_reqpool_maxreqcount)
3250                         adding = osc_reqpool_maxreqcount - req_count;
3251
3252                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3253                 atomic_add(added, &osc_pool_req_count);
3254         }
3255
3256         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3257
3258         spin_lock(&osc_shrink_lock);
3259         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3260         spin_unlock(&osc_shrink_lock);
3261         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3262         cli->cl_import->imp_idle_debug = D_HA;
3263
3264         RETURN(0);
3265 }
3266
3267 int osc_precleanup_common(struct obd_device *obd)
3268 {
3269         struct client_obd *cli = &obd->u.cli;
3270         ENTRY;
3271
3272         /* LU-464
3273          * for echo client, export may be on zombie list, wait for
3274          * zombie thread to cull it, because cli.cl_import will be
3275          * cleared in client_disconnect_export():
3276          *   class_export_destroy() -> obd_cleanup() ->
3277          *   echo_device_free() -> echo_client_cleanup() ->
3278          *   obd_disconnect() -> osc_disconnect() ->
3279          *   client_disconnect_export()
3280          */
3281         obd_zombie_barrier();
3282         if (cli->cl_writeback_work) {
3283                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3284                 cli->cl_writeback_work = NULL;
3285         }
3286
3287         if (cli->cl_lru_work) {
3288                 ptlrpcd_destroy_work(cli->cl_lru_work);
3289                 cli->cl_lru_work = NULL;
3290         }
3291
3292         obd_cleanup_client_import(obd);
3293         RETURN(0);
3294 }
3295 EXPORT_SYMBOL(osc_precleanup_common);
3296
3297 static int osc_precleanup(struct obd_device *obd)
3298 {
3299         ENTRY;
3300
3301         osc_precleanup_common(obd);
3302
3303         ptlrpc_lprocfs_unregister_obd(obd);
3304         RETURN(0);
3305 }
3306
3307 int osc_cleanup_common(struct obd_device *obd)
3308 {
3309         struct client_obd *cli = &obd->u.cli;
3310         int rc;
3311
3312         ENTRY;
3313
3314         spin_lock(&osc_shrink_lock);
3315         list_del(&cli->cl_shrink_list);
3316         spin_unlock(&osc_shrink_lock);
3317
3318         /* lru cleanup */
3319         if (cli->cl_cache != NULL) {
3320                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3321                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3322                 list_del_init(&cli->cl_lru_osc);
3323                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3324                 cli->cl_lru_left = NULL;
3325                 cl_cache_decref(cli->cl_cache);
3326                 cli->cl_cache = NULL;
3327         }
3328
3329         /* free memory of osc quota cache */
3330         osc_quota_cleanup(obd);
3331
3332         rc = client_obd_cleanup(obd);
3333
3334         ptlrpcd_decref();
3335         RETURN(rc);
3336 }
3337 EXPORT_SYMBOL(osc_cleanup_common);
3338
3339 static const struct obd_ops osc_obd_ops = {
3340         .o_owner                = THIS_MODULE,
3341         .o_setup                = osc_setup,
3342         .o_precleanup           = osc_precleanup,
3343         .o_cleanup              = osc_cleanup_common,
3344         .o_add_conn             = client_import_add_conn,
3345         .o_del_conn             = client_import_del_conn,
3346         .o_connect              = client_connect_import,
3347         .o_reconnect            = osc_reconnect,
3348         .o_disconnect           = osc_disconnect,
3349         .o_statfs               = osc_statfs,
3350         .o_statfs_async         = osc_statfs_async,
3351         .o_create               = osc_create,
3352         .o_destroy              = osc_destroy,
3353         .o_getattr              = osc_getattr,
3354         .o_setattr              = osc_setattr,
3355         .o_iocontrol            = osc_iocontrol,
3356         .o_set_info_async       = osc_set_info_async,
3357         .o_import_event         = osc_import_event,
3358         .o_quotactl             = osc_quotactl,
3359 };
3360
3361 static struct shrinker *osc_cache_shrinker;
3362 LIST_HEAD(osc_shrink_list);
3363 DEFINE_SPINLOCK(osc_shrink_lock);
3364
3365 #ifndef HAVE_SHRINKER_COUNT
3366 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3367 {
3368         struct shrink_control scv = {
3369                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3370                 .gfp_mask   = shrink_param(sc, gfp_mask)
3371         };
3372         (void)osc_cache_shrink_scan(shrinker, &scv);
3373
3374         return osc_cache_shrink_count(shrinker, &scv);
3375 }
3376 #endif
3377
3378 static int __init osc_init(void)
3379 {
3380         unsigned int reqpool_size;
3381         unsigned int reqsize;
3382         int rc;
3383         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3384                          osc_cache_shrink_count, osc_cache_shrink_scan);
3385         ENTRY;
3386
3387         /* print an address of _any_ initialized kernel symbol from this
3388          * module, to allow debugging with gdb that doesn't support data
3389          * symbols from modules.*/
3390         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3391
3392         rc = lu_kmem_init(osc_caches);
3393         if (rc)
3394                 RETURN(rc);
3395
3396         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3397                                  LUSTRE_OSC_NAME, &osc_device_type);
3398         if (rc)
3399                 GOTO(out_kmem, rc);
3400
3401         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3402
3403         /* This is obviously too much memory, only prevent overflow here */
3404         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3405                 GOTO(out_type, rc = -EINVAL);
3406
3407         reqpool_size = osc_reqpool_mem_max << 20;
3408
3409         reqsize = 1;
3410         while (reqsize < OST_IO_MAXREQSIZE)
3411                 reqsize = reqsize << 1;
3412
3413         /*
3414          * We don't enlarge the request count in OSC pool according to
3415          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3416          * tried after normal allocation failed. So a small OSC pool won't
3417          * cause much performance degression in most of cases.
3418          */
3419         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3420
3421         atomic_set(&osc_pool_req_count, 0);
3422         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3423                                           ptlrpc_add_rqs_to_pool);
3424
3425         if (osc_rq_pool == NULL)
3426                 GOTO(out_type, rc = -ENOMEM);
3427
3428         rc = osc_start_grant_work();
3429         if (rc != 0)
3430                 GOTO(out_req_pool, rc);
3431
3432         RETURN(rc);
3433
3434 out_req_pool:
3435         ptlrpc_free_rq_pool(osc_rq_pool);
3436 out_type:
3437         class_unregister_type(LUSTRE_OSC_NAME);
3438 out_kmem:
3439         lu_kmem_fini(osc_caches);
3440
3441         RETURN(rc);
3442 }
3443
3444 static void __exit osc_exit(void)
3445 {
3446         osc_stop_grant_work();
3447         remove_shrinker(osc_cache_shrinker);
3448         class_unregister_type(LUSTRE_OSC_NAME);
3449         lu_kmem_fini(osc_caches);
3450         ptlrpc_free_rq_pool(osc_rq_pool);
3451 }
3452
3453 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3454 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3455 MODULE_VERSION(LUSTRE_VERSION_STRING);
3456 MODULE_LICENSE("GPL");
3457
3458 module_init(osc_init);
3459 module_exit(osc_exit);