Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct list_head       cancels = LIST_HEAD_INIT(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
649                      cli->cl_dirty_max_pages)) {
650                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
651                        cli->cl_dirty_pages, cli->cl_dirty_transit,
652                        cli->cl_dirty_max_pages);
653                 oa->o_undirty = 0;
654         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
655                             atomic_long_read(&obd_dirty_transit_pages) >
656                             (long)(obd_max_dirty_pages + 1))) {
657                 /* The atomic_read() allowing the atomic_inc() are
658                  * not covered by a lock thus they may safely race and trip
659                  * this CERROR() unless we add in a small fudge factor (+1). */
660                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
661                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
662                        atomic_long_read(&obd_dirty_transit_pages),
663                        obd_max_dirty_pages);
664                 oa->o_undirty = 0;
665         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
666                             0x7fffffff)) {
667                 CERROR("dirty %lu - dirty_max %lu too big???\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else {
671                 unsigned long nrpages;
672                 unsigned long undirty;
673
674                 nrpages = cli->cl_max_pages_per_rpc;
675                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
676                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
677                 undirty = nrpages << PAGE_SHIFT;
678                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
679                                  GRANT_PARAM)) {
680                         int nrextents;
681
682                         /* take extent tax into account when asking for more
683                          * grant space */
684                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
685                                      cli->cl_max_extent_pages;
686                         undirty += nrextents * cli->cl_grant_extent_tax;
687                 }
688                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
689                  * to add extent tax, etc.
690                  */
691                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
692                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
693         }
694         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
695         oa->o_dropped = cli->cl_lost_grant;
696         cli->cl_lost_grant = 0;
697         spin_unlock(&cli->cl_loi_list_lock);
698         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
699                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
700 }
701
702 void osc_update_next_shrink(struct client_obd *cli)
703 {
704         cli->cl_next_shrink_grant = ktime_get_seconds() +
705                                     cli->cl_grant_shrink_interval;
706
707         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
708                cli->cl_next_shrink_grant);
709 }
710
711 static void __osc_update_grant(struct client_obd *cli, u64 grant)
712 {
713         spin_lock(&cli->cl_loi_list_lock);
714         cli->cl_avail_grant += grant;
715         spin_unlock(&cli->cl_loi_list_lock);
716 }
717
718 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
719 {
720         if (body->oa.o_valid & OBD_MD_FLGRANT) {
721                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
722                 __osc_update_grant(cli, body->oa.o_grant);
723         }
724 }
725
726 /**
727  * grant thread data for shrinking space.
728  */
729 struct grant_thread_data {
730         struct list_head        gtd_clients;
731         struct mutex            gtd_mutex;
732         unsigned long           gtd_stopped:1;
733 };
734 static struct grant_thread_data client_gtd;
735
736 static int osc_shrink_grant_interpret(const struct lu_env *env,
737                                       struct ptlrpc_request *req,
738                                       void *args, int rc)
739 {
740         struct osc_grant_args *aa = args;
741         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
742         struct ost_body *body;
743
744         if (rc != 0) {
745                 __osc_update_grant(cli, aa->aa_oa->o_grant);
746                 GOTO(out, rc);
747         }
748
749         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750         LASSERT(body);
751         osc_update_grant(cli, body);
752 out:
753         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
754         aa->aa_oa = NULL;
755
756         return rc;
757 }
758
759 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
760 {
761         spin_lock(&cli->cl_loi_list_lock);
762         oa->o_grant = cli->cl_avail_grant / 4;
763         cli->cl_avail_grant -= oa->o_grant;
764         spin_unlock(&cli->cl_loi_list_lock);
765         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
766                 oa->o_valid |= OBD_MD_FLFLAGS;
767                 oa->o_flags = 0;
768         }
769         oa->o_flags |= OBD_FL_SHRINK_GRANT;
770         osc_update_next_shrink(cli);
771 }
772
773 /* Shrink the current grant, either from some large amount to enough for a
774  * full set of in-flight RPCs, or if we have already shrunk to that limit
775  * then to enough for a single RPC.  This avoids keeping more grant than
776  * needed, and avoids shrinking the grant piecemeal. */
777 static int osc_shrink_grant(struct client_obd *cli)
778 {
779         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
780                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
781
782         spin_lock(&cli->cl_loi_list_lock);
783         if (cli->cl_avail_grant <= target_bytes)
784                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
785         spin_unlock(&cli->cl_loi_list_lock);
786
787         return osc_shrink_grant_to_target(cli, target_bytes);
788 }
789
790 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
791 {
792         int                     rc = 0;
793         struct ost_body        *body;
794         ENTRY;
795
796         spin_lock(&cli->cl_loi_list_lock);
797         /* Don't shrink if we are already above or below the desired limit
798          * We don't want to shrink below a single RPC, as that will negatively
799          * impact block allocation and long-term performance. */
800         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
801                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
802
803         if (target_bytes >= cli->cl_avail_grant) {
804                 spin_unlock(&cli->cl_loi_list_lock);
805                 RETURN(0);
806         }
807         spin_unlock(&cli->cl_loi_list_lock);
808
809         OBD_ALLOC_PTR(body);
810         if (!body)
811                 RETURN(-ENOMEM);
812
813         osc_announce_cached(cli, &body->oa, 0);
814
815         spin_lock(&cli->cl_loi_list_lock);
816         if (target_bytes >= cli->cl_avail_grant) {
817                 /* available grant has changed since target calculation */
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 GOTO(out_free, rc = 0);
820         }
821         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
822         cli->cl_avail_grant = target_bytes;
823         spin_unlock(&cli->cl_loi_list_lock);
824         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
825                 body->oa.o_valid |= OBD_MD_FLFLAGS;
826                 body->oa.o_flags = 0;
827         }
828         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
829         osc_update_next_shrink(cli);
830
831         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
832                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
833                                 sizeof(*body), body, NULL);
834         if (rc != 0)
835                 __osc_update_grant(cli, body->oa.o_grant);
836 out_free:
837         OBD_FREE_PTR(body);
838         RETURN(rc);
839 }
840
841 static int osc_should_shrink_grant(struct client_obd *client)
842 {
843         time64_t next_shrink = client->cl_next_shrink_grant;
844
845         if (client->cl_import == NULL)
846                 return 0;
847
848         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
849              OBD_CONNECT_GRANT_SHRINK) == 0)
850                 return 0;
851
852         if (ktime_get_seconds() >= next_shrink - 5) {
853                 /* Get the current RPC size directly, instead of going via:
854                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
855                  * Keep comment here so that it can be found by searching. */
856                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
859                     client->cl_avail_grant > brw_size)
860                         return 1;
861                 else
862                         osc_update_next_shrink(client);
863         }
864         return 0;
865 }
866
867 #define GRANT_SHRINK_RPC_BATCH  100
868
869 static struct delayed_work work;
870
871 static void osc_grant_work_handler(struct work_struct *data)
872 {
873         struct client_obd *cli;
874         int rpc_sent;
875         bool init_next_shrink = true;
876         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
877
878         rpc_sent = 0;
879         mutex_lock(&client_gtd.gtd_mutex);
880         list_for_each_entry(cli, &client_gtd.gtd_clients,
881                             cl_grant_chain) {
882                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
883                     osc_should_shrink_grant(cli)) {
884                         osc_shrink_grant(cli);
885                         rpc_sent++;
886                 }
887
888                 if (!init_next_shrink) {
889                         if (cli->cl_next_shrink_grant < next_shrink &&
890                             cli->cl_next_shrink_grant > ktime_get_seconds())
891                                 next_shrink = cli->cl_next_shrink_grant;
892                 } else {
893                         init_next_shrink = false;
894                         next_shrink = cli->cl_next_shrink_grant;
895                 }
896         }
897         mutex_unlock(&client_gtd.gtd_mutex);
898
899         if (client_gtd.gtd_stopped == 1)
900                 return;
901
902         if (next_shrink > ktime_get_seconds()) {
903                 time64_t delay = next_shrink - ktime_get_seconds();
904
905                 schedule_delayed_work(&work, cfs_time_seconds(delay));
906         } else {
907                 schedule_work(&work.work);
908         }
909 }
910
911 void osc_schedule_grant_work(void)
912 {
913         cancel_delayed_work_sync(&work);
914         schedule_work(&work.work);
915 }
916
917 /**
918  * Start grant thread for returing grant to server for idle clients.
919  */
920 static int osc_start_grant_work(void)
921 {
922         client_gtd.gtd_stopped = 0;
923         mutex_init(&client_gtd.gtd_mutex);
924         INIT_LIST_HEAD(&client_gtd.gtd_clients);
925
926         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
927         schedule_work(&work.work);
928
929         return 0;
930 }
931
932 static void osc_stop_grant_work(void)
933 {
934         client_gtd.gtd_stopped = 1;
935         cancel_delayed_work_sync(&work);
936 }
937
938 static void osc_add_grant_list(struct client_obd *client)
939 {
940         mutex_lock(&client_gtd.gtd_mutex);
941         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
942         mutex_unlock(&client_gtd.gtd_mutex);
943 }
944
945 static void osc_del_grant_list(struct client_obd *client)
946 {
947         if (list_empty(&client->cl_grant_chain))
948                 return;
949
950         mutex_lock(&client_gtd.gtd_mutex);
951         list_del_init(&client->cl_grant_chain);
952         mutex_unlock(&client_gtd.gtd_mutex);
953 }
954
955 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
956 {
957         /*
958          * ocd_grant is the total grant amount we're expect to hold: if we've
959          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
960          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
961          * dirty.
962          *
963          * race is tolerable here: if we're evicted, but imp_state already
964          * left EVICTED state, then cl_dirty_pages must be 0 already.
965          */
966         spin_lock(&cli->cl_loi_list_lock);
967         cli->cl_avail_grant = ocd->ocd_grant;
968         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
969                 cli->cl_avail_grant -= cli->cl_reserved_grant;
970                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
971                         cli->cl_avail_grant -= cli->cl_dirty_grant;
972                 else
973                         cli->cl_avail_grant -=
974                                         cli->cl_dirty_pages << PAGE_SHIFT;
975         }
976
977         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
978                 u64 size;
979                 int chunk_mask;
980
981                 /* overhead for each extent insertion */
982                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
983                 /* determine the appropriate chunk size used by osc_extent. */
984                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
985                                           ocd->ocd_grant_blkbits);
986                 /* max_pages_per_rpc must be chunk aligned */
987                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
988                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
989                                              ~chunk_mask) & chunk_mask;
990                 /* determine maximum extent size, in #pages */
991                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
992                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
993                 if (cli->cl_max_extent_pages == 0)
994                         cli->cl_max_extent_pages = 1;
995         } else {
996                 cli->cl_grant_extent_tax = 0;
997                 cli->cl_chunkbits = PAGE_SHIFT;
998                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
999         }
1000         spin_unlock(&cli->cl_loi_list_lock);
1001
1002         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1003                 "chunk bits: %d cl_max_extent_pages: %d\n",
1004                 cli_name(cli),
1005                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1006                 cli->cl_max_extent_pages);
1007
1008         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1009                 osc_add_grant_list(cli);
1010 }
1011 EXPORT_SYMBOL(osc_init_grant);
1012
1013 /* We assume that the reason this OSC got a short read is because it read
1014  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1015  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1016  * this stripe never got written at or beyond this stripe offset yet. */
1017 static void handle_short_read(int nob_read, size_t page_count,
1018                               struct brw_page **pga)
1019 {
1020         char *ptr;
1021         int i = 0;
1022
1023         /* skip bytes read OK */
1024         while (nob_read > 0) {
1025                 LASSERT (page_count > 0);
1026
1027                 if (pga[i]->count > nob_read) {
1028                         /* EOF inside this page */
1029                         ptr = kmap(pga[i]->pg) +
1030                                 (pga[i]->off & ~PAGE_MASK);
1031                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1032                         kunmap(pga[i]->pg);
1033                         page_count--;
1034                         i++;
1035                         break;
1036                 }
1037
1038                 nob_read -= pga[i]->count;
1039                 page_count--;
1040                 i++;
1041         }
1042
1043         /* zero remaining pages */
1044         while (page_count-- > 0) {
1045                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1046                 memset(ptr, 0, pga[i]->count);
1047                 kunmap(pga[i]->pg);
1048                 i++;
1049         }
1050 }
1051
1052 static int check_write_rcs(struct ptlrpc_request *req,
1053                            int requested_nob, int niocount,
1054                            size_t page_count, struct brw_page **pga)
1055 {
1056         int     i;
1057         __u32   *remote_rcs;
1058
1059         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1060                                                   sizeof(*remote_rcs) *
1061                                                   niocount);
1062         if (remote_rcs == NULL) {
1063                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1064                 return(-EPROTO);
1065         }
1066
1067         /* return error if any niobuf was in error */
1068         for (i = 0; i < niocount; i++) {
1069                 if ((int)remote_rcs[i] < 0) {
1070                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1071                                i, remote_rcs[i], req);
1072                         return remote_rcs[i];
1073                 }
1074
1075                 if (remote_rcs[i] != 0) {
1076                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1077                                 i, remote_rcs[i], req);
1078                         return(-EPROTO);
1079                 }
1080         }
1081         if (req->rq_bulk != NULL &&
1082             req->rq_bulk->bd_nob_transferred != requested_nob) {
1083                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1084                        req->rq_bulk->bd_nob_transferred, requested_nob);
1085                 return(-EPROTO);
1086         }
1087
1088         return (0);
1089 }
1090
1091 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1092 {
1093         if (p1->flag != p2->flag) {
1094                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1095                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1096                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1097
1098                 /* warn if we try to combine flags that we don't know to be
1099                  * safe to combine */
1100                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1101                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1102                               "report this at https://jira.whamcloud.com/\n",
1103                               p1->flag, p2->flag);
1104                 }
1105                 return 0;
1106         }
1107
1108         return (p1->off + p1->count == p2->off);
1109 }
1110
1111 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1112 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1113                                    size_t pg_count, struct brw_page **pga,
1114                                    int opc, obd_dif_csum_fn *fn,
1115                                    int sector_size,
1116                                    u32 *check_sum)
1117 {
1118         struct ahash_request *req;
1119         /* Used Adler as the default checksum type on top of DIF tags */
1120         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1121         struct page *__page;
1122         unsigned char *buffer;
1123         __u16 *guard_start;
1124         unsigned int bufsize;
1125         int guard_number;
1126         int used_number = 0;
1127         int used;
1128         u32 cksum;
1129         int rc = 0;
1130         int i = 0;
1131
1132         LASSERT(pg_count > 0);
1133
1134         __page = alloc_page(GFP_KERNEL);
1135         if (__page == NULL)
1136                 return -ENOMEM;
1137
1138         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1139         if (IS_ERR(req)) {
1140                 rc = PTR_ERR(req);
1141                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1142                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1143                 GOTO(out, rc);
1144         }
1145
1146         buffer = kmap(__page);
1147         guard_start = (__u16 *)buffer;
1148         guard_number = PAGE_SIZE / sizeof(*guard_start);
1149         while (nob > 0 && pg_count > 0) {
1150                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1151
1152                 /* corrupt the data before we compute the checksum, to
1153                  * simulate an OST->client data error */
1154                 if (unlikely(i == 0 && opc == OST_READ &&
1155                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1156                         unsigned char *ptr = kmap(pga[i]->pg);
1157                         int off = pga[i]->off & ~PAGE_MASK;
1158
1159                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1160                         kunmap(pga[i]->pg);
1161                 }
1162
1163                 /*
1164                  * The left guard number should be able to hold checksums of a
1165                  * whole page
1166                  */
1167                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1168                                                   pga[i]->off & ~PAGE_MASK,
1169                                                   count,
1170                                                   guard_start + used_number,
1171                                                   guard_number - used_number,
1172                                                   &used, sector_size,
1173                                                   fn);
1174                 if (rc)
1175                         break;
1176
1177                 used_number += used;
1178                 if (used_number == guard_number) {
1179                         cfs_crypto_hash_update_page(req, __page, 0,
1180                                 used_number * sizeof(*guard_start));
1181                         used_number = 0;
1182                 }
1183
1184                 nob -= pga[i]->count;
1185                 pg_count--;
1186                 i++;
1187         }
1188         kunmap(__page);
1189         if (rc)
1190                 GOTO(out, rc);
1191
1192         if (used_number != 0)
1193                 cfs_crypto_hash_update_page(req, __page, 0,
1194                         used_number * sizeof(*guard_start));
1195
1196         bufsize = sizeof(cksum);
1197         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1198
1199         /* For sending we only compute the wrong checksum instead
1200          * of corrupting the data so it is still correct on a redo */
1201         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1202                 cksum++;
1203
1204         *check_sum = cksum;
1205 out:
1206         __free_page(__page);
1207         return rc;
1208 }
1209 #else /* !CONFIG_CRC_T10DIF */
1210 #define obd_dif_ip_fn NULL
1211 #define obd_dif_crc_fn NULL
1212 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1213         -EOPNOTSUPP
1214 #endif /* CONFIG_CRC_T10DIF */
1215
1216 static int osc_checksum_bulk(int nob, size_t pg_count,
1217                              struct brw_page **pga, int opc,
1218                              enum cksum_types cksum_type,
1219                              u32 *cksum)
1220 {
1221         int                             i = 0;
1222         struct ahash_request           *req;
1223         unsigned int                    bufsize;
1224         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1225
1226         LASSERT(pg_count > 0);
1227
1228         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1229         if (IS_ERR(req)) {
1230                 CERROR("Unable to initialize checksum hash %s\n",
1231                        cfs_crypto_hash_name(cfs_alg));
1232                 return PTR_ERR(req);
1233         }
1234
1235         while (nob > 0 && pg_count > 0) {
1236                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1237
1238                 /* corrupt the data before we compute the checksum, to
1239                  * simulate an OST->client data error */
1240                 if (i == 0 && opc == OST_READ &&
1241                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1242                         unsigned char *ptr = kmap(pga[i]->pg);
1243                         int off = pga[i]->off & ~PAGE_MASK;
1244
1245                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1246                         kunmap(pga[i]->pg);
1247                 }
1248                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1249                                             pga[i]->off & ~PAGE_MASK,
1250                                             count);
1251                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1252                                (int)(pga[i]->off & ~PAGE_MASK));
1253
1254                 nob -= pga[i]->count;
1255                 pg_count--;
1256                 i++;
1257         }
1258
1259         bufsize = sizeof(*cksum);
1260         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1261
1262         /* For sending we only compute the wrong checksum instead
1263          * of corrupting the data so it is still correct on a redo */
1264         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1265                 (*cksum)++;
1266
1267         return 0;
1268 }
1269
1270 static int osc_checksum_bulk_rw(const char *obd_name,
1271                                 enum cksum_types cksum_type,
1272                                 int nob, size_t pg_count,
1273                                 struct brw_page **pga, int opc,
1274                                 u32 *check_sum)
1275 {
1276         obd_dif_csum_fn *fn = NULL;
1277         int sector_size = 0;
1278         int rc;
1279
1280         ENTRY;
1281         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1282
1283         if (fn)
1284                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1285                                              opc, fn, sector_size, check_sum);
1286         else
1287                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1288                                        check_sum);
1289
1290         RETURN(rc);
1291 }
1292
1293 static int
1294 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1295                      u32 page_count, struct brw_page **pga,
1296                      struct ptlrpc_request **reqp, int resend)
1297 {
1298         struct ptlrpc_request   *req;
1299         struct ptlrpc_bulk_desc *desc;
1300         struct ost_body         *body;
1301         struct obd_ioobj        *ioobj;
1302         struct niobuf_remote    *niobuf;
1303         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1304         struct osc_brw_async_args *aa;
1305         struct req_capsule      *pill;
1306         struct brw_page *pg_prev;
1307         void *short_io_buf;
1308         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1309
1310         ENTRY;
1311         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1312                 RETURN(-ENOMEM); /* Recoverable */
1313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1314                 RETURN(-EINVAL); /* Fatal */
1315
1316         if ((cmd & OBD_BRW_WRITE) != 0) {
1317                 opc = OST_WRITE;
1318                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1319                                                 osc_rq_pool,
1320                                                 &RQF_OST_BRW_WRITE);
1321         } else {
1322                 opc = OST_READ;
1323                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1324         }
1325         if (req == NULL)
1326                 RETURN(-ENOMEM);
1327
1328         for (niocount = i = 1; i < page_count; i++) {
1329                 if (!can_merge_pages(pga[i - 1], pga[i]))
1330                         niocount++;
1331         }
1332
1333         pill = &req->rq_pill;
1334         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1335                              sizeof(*ioobj));
1336         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1337                              niocount * sizeof(*niobuf));
1338
1339         for (i = 0; i < page_count; i++)
1340                 short_io_size += pga[i]->count;
1341
1342         /* Check if read/write is small enough to be a short io. */
1343         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1344             !imp_connect_shortio(cli->cl_import))
1345                 short_io_size = 0;
1346
1347         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1348                              opc == OST_READ ? 0 : short_io_size);
1349         if (opc == OST_READ)
1350                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1351                                      short_io_size);
1352
1353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1354         if (rc) {
1355                 ptlrpc_request_free(req);
1356                 RETURN(rc);
1357         }
1358         osc_set_io_portal(req);
1359
1360         ptlrpc_at_set_req_timeout(req);
1361         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1362          * retry logic */
1363         req->rq_no_retry_einprogress = 1;
1364
1365         if (short_io_size != 0) {
1366                 desc = NULL;
1367                 short_io_buf = NULL;
1368                 goto no_bulk;
1369         }
1370
1371         desc = ptlrpc_prep_bulk_imp(req, page_count,
1372                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1373                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1374                         PTLRPC_BULK_PUT_SINK) |
1375                         PTLRPC_BULK_BUF_KIOV,
1376                 OST_BULK_PORTAL,
1377                 &ptlrpc_bulk_kiov_pin_ops);
1378
1379         if (desc == NULL)
1380                 GOTO(out, rc = -ENOMEM);
1381         /* NB request now owns desc and will free it when it gets freed */
1382 no_bulk:
1383         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1384         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1385         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1386         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1387
1388         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1389
1390         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1391          * and from_kgid(), because they are asynchronous. Fortunately, variable
1392          * oa contains valid o_uid and o_gid in these two operations.
1393          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1394          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1395          * other process logic */
1396         body->oa.o_uid = oa->o_uid;
1397         body->oa.o_gid = oa->o_gid;
1398
1399         obdo_to_ioobj(oa, ioobj);
1400         ioobj->ioo_bufcnt = niocount;
1401         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1402          * that might be send for this request.  The actual number is decided
1403          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1404          * "max - 1" for old client compatibility sending "0", and also so the
1405          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1406         if (desc != NULL)
1407                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1408         else /* short io */
1409                 ioobj_max_brw_set(ioobj, 0);
1410
1411         if (short_io_size != 0) {
1412                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1413                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1414                         body->oa.o_flags = 0;
1415                 }
1416                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1417                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1418                        short_io_size);
1419                 if (opc == OST_WRITE) {
1420                         short_io_buf = req_capsule_client_get(pill,
1421                                                               &RMF_SHORT_IO);
1422                         LASSERT(short_io_buf != NULL);
1423                 }
1424         }
1425
1426         LASSERT(page_count > 0);
1427         pg_prev = pga[0];
1428         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1429                 struct brw_page *pg = pga[i];
1430                 int poff = pg->off & ~PAGE_MASK;
1431
1432                 LASSERT(pg->count > 0);
1433                 /* make sure there is no gap in the middle of page array */
1434                 LASSERTF(page_count == 1 ||
1435                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1436                           ergo(i > 0 && i < page_count - 1,
1437                                poff == 0 && pg->count == PAGE_SIZE)   &&
1438                           ergo(i == page_count - 1, poff == 0)),
1439                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1440                          i, page_count, pg, pg->off, pg->count);
1441                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1442                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1443                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1444                          i, page_count,
1445                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1446                          pg_prev->pg, page_private(pg_prev->pg),
1447                          pg_prev->pg->index, pg_prev->off);
1448                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1449                         (pg->flag & OBD_BRW_SRVLOCK));
1450                 if (short_io_size != 0 && opc == OST_WRITE) {
1451                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1452
1453                         LASSERT(short_io_size >= requested_nob + pg->count);
1454                         memcpy(short_io_buf + requested_nob,
1455                                ptr + poff,
1456                                pg->count);
1457                         ll_kunmap_atomic(ptr, KM_USER0);
1458                 } else if (short_io_size == 0) {
1459                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1460                                                          pg->count);
1461                 }
1462                 requested_nob += pg->count;
1463
1464                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1465                         niobuf--;
1466                         niobuf->rnb_len += pg->count;
1467                 } else {
1468                         niobuf->rnb_offset = pg->off;
1469                         niobuf->rnb_len    = pg->count;
1470                         niobuf->rnb_flags  = pg->flag;
1471                 }
1472                 pg_prev = pg;
1473         }
1474
1475         LASSERTF((void *)(niobuf - niocount) ==
1476                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1477                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1478                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1479
1480         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1481         if (resend) {
1482                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1483                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1484                         body->oa.o_flags = 0;
1485                 }
1486                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1487         }
1488
1489         if (osc_should_shrink_grant(cli))
1490                 osc_shrink_grant_local(cli, &body->oa);
1491
1492         /* size[REQ_REC_OFF] still sizeof (*body) */
1493         if (opc == OST_WRITE) {
1494                 if (cli->cl_checksum &&
1495                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1496                         /* store cl_cksum_type in a local variable since
1497                          * it can be changed via lprocfs */
1498                         enum cksum_types cksum_type = cli->cl_cksum_type;
1499
1500                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1501                                 body->oa.o_flags = 0;
1502
1503                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1504                                                                 cksum_type);
1505                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1506
1507                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1508                                                   requested_nob, page_count,
1509                                                   pga, OST_WRITE,
1510                                                   &body->oa.o_cksum);
1511                         if (rc < 0) {
1512                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1513                                        rc);
1514                                 GOTO(out, rc);
1515                         }
1516                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1517                                body->oa.o_cksum);
1518
1519                         /* save this in 'oa', too, for later checking */
1520                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1521                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1522                                                            cksum_type);
1523                 } else {
1524                         /* clear out the checksum flag, in case this is a
1525                          * resend but cl_checksum is no longer set. b=11238 */
1526                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1527                 }
1528                 oa->o_cksum = body->oa.o_cksum;
1529                 /* 1 RC per niobuf */
1530                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1531                                      sizeof(__u32) * niocount);
1532         } else {
1533                 if (cli->cl_checksum &&
1534                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1535                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1536                                 body->oa.o_flags = 0;
1537                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1538                                 cli->cl_cksum_type);
1539                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1540                 }
1541
1542                 /* Client cksum has been already copied to wire obdo in previous
1543                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1544                  * resent due to cksum error, this will allow Server to
1545                  * check+dump pages on its side */
1546         }
1547         ptlrpc_request_set_replen(req);
1548
1549         aa = ptlrpc_req_async_args(aa, req);
1550         aa->aa_oa = oa;
1551         aa->aa_requested_nob = requested_nob;
1552         aa->aa_nio_count = niocount;
1553         aa->aa_page_count = page_count;
1554         aa->aa_resends = 0;
1555         aa->aa_ppga = pga;
1556         aa->aa_cli = cli;
1557         INIT_LIST_HEAD(&aa->aa_oaps);
1558
1559         *reqp = req;
1560         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1561         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1562                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1563                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1564         RETURN(0);
1565
1566  out:
1567         ptlrpc_req_finished(req);
1568         RETURN(rc);
1569 }
1570
1571 char dbgcksum_file_name[PATH_MAX];
1572
1573 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1574                                 struct brw_page **pga, __u32 server_cksum,
1575                                 __u32 client_cksum)
1576 {
1577         struct file *filp;
1578         int rc, i;
1579         unsigned int len;
1580         char *buf;
1581
1582         /* will only keep dump of pages on first error for the same range in
1583          * file/fid, not during the resends/retries. */
1584         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1585                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1586                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1587                   libcfs_debug_file_path_arr :
1588                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1591                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1592                  pga[0]->off,
1593                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1594                  client_cksum, server_cksum);
1595         filp = filp_open(dbgcksum_file_name,
1596                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1597         if (IS_ERR(filp)) {
1598                 rc = PTR_ERR(filp);
1599                 if (rc == -EEXIST)
1600                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1601                                "checksum error: rc = %d\n", dbgcksum_file_name,
1602                                rc);
1603                 else
1604                         CERROR("%s: can't open to dump pages with checksum "
1605                                "error: rc = %d\n", dbgcksum_file_name, rc);
1606                 return;
1607         }
1608
1609         for (i = 0; i < page_count; i++) {
1610                 len = pga[i]->count;
1611                 buf = kmap(pga[i]->pg);
1612                 while (len != 0) {
1613                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1614                         if (rc < 0) {
1615                                 CERROR("%s: wanted to write %u but got %d "
1616                                        "error\n", dbgcksum_file_name, len, rc);
1617                                 break;
1618                         }
1619                         len -= rc;
1620                         buf += rc;
1621                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1622                                dbgcksum_file_name, rc);
1623                 }
1624                 kunmap(pga[i]->pg);
1625         }
1626
1627         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1628         if (rc)
1629                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1630         filp_close(filp, NULL);
1631         return;
1632 }
1633
1634 static int
1635 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1636                      __u32 client_cksum, __u32 server_cksum,
1637                      struct osc_brw_async_args *aa)
1638 {
1639         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1640         enum cksum_types cksum_type;
1641         obd_dif_csum_fn *fn = NULL;
1642         int sector_size = 0;
1643         __u32 new_cksum;
1644         char *msg;
1645         int rc;
1646
1647         if (server_cksum == client_cksum) {
1648                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1649                 return 0;
1650         }
1651
1652         if (aa->aa_cli->cl_checksum_dump)
1653                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1654                                     server_cksum, client_cksum);
1655
1656         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1657                                            oa->o_flags : 0);
1658
1659         switch (cksum_type) {
1660         case OBD_CKSUM_T10IP512:
1661                 fn = obd_dif_ip_fn;
1662                 sector_size = 512;
1663                 break;
1664         case OBD_CKSUM_T10IP4K:
1665                 fn = obd_dif_ip_fn;
1666                 sector_size = 4096;
1667                 break;
1668         case OBD_CKSUM_T10CRC512:
1669                 fn = obd_dif_crc_fn;
1670                 sector_size = 512;
1671                 break;
1672         case OBD_CKSUM_T10CRC4K:
1673                 fn = obd_dif_crc_fn;
1674                 sector_size = 4096;
1675                 break;
1676         default:
1677                 break;
1678         }
1679
1680         if (fn)
1681                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1682                                              aa->aa_page_count, aa->aa_ppga,
1683                                              OST_WRITE, fn, sector_size,
1684                                              &new_cksum);
1685         else
1686                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1687                                        aa->aa_ppga, OST_WRITE, cksum_type,
1688                                        &new_cksum);
1689
1690         if (rc < 0)
1691                 msg = "failed to calculate the client write checksum";
1692         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1693                 msg = "the server did not use the checksum type specified in "
1694                       "the original request - likely a protocol problem";
1695         else if (new_cksum == server_cksum)
1696                 msg = "changed on the client after we checksummed it - "
1697                       "likely false positive due to mmap IO (bug 11742)";
1698         else if (new_cksum == client_cksum)
1699                 msg = "changed in transit before arrival at OST";
1700         else
1701                 msg = "changed in transit AND doesn't match the original - "
1702                       "likely false positive due to mmap IO (bug 11742)";
1703
1704         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1705                            DFID " object "DOSTID" extent [%llu-%llu], original "
1706                            "client csum %x (type %x), server csum %x (type %x),"
1707                            " client csum now %x\n",
1708                            obd_name, msg, libcfs_nid2str(peer->nid),
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1711                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1712                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1713                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1714                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1715                            client_cksum,
1716                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1717                            server_cksum, cksum_type, new_cksum);
1718         return 1;
1719 }
1720
1721 /* Note rc enters this function as number of bytes transferred */
1722 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1723 {
1724         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1725         struct client_obd *cli = aa->aa_cli;
1726         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1727         const struct lnet_process_id *peer =
1728                 &req->rq_import->imp_connection->c_peer;
1729         struct ost_body *body;
1730         u32 client_cksum = 0;
1731
1732         ENTRY;
1733
1734         if (rc < 0 && rc != -EDQUOT) {
1735                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1736                 RETURN(rc);
1737         }
1738
1739         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1740         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1741         if (body == NULL) {
1742                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1743                 RETURN(-EPROTO);
1744         }
1745
1746         /* set/clear over quota flag for a uid/gid/projid */
1747         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1748             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1749                 unsigned qid[LL_MAXQUOTAS] = {
1750                                          body->oa.o_uid, body->oa.o_gid,
1751                                          body->oa.o_projid };
1752                 CDEBUG(D_QUOTA,
1753                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1754                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1755                        body->oa.o_valid, body->oa.o_flags);
1756                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1757                                        body->oa.o_flags);
1758         }
1759
1760         osc_update_grant(cli, body);
1761
1762         if (rc < 0)
1763                 RETURN(rc);
1764
1765         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1766                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1767
1768         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1769                 if (rc > 0) {
1770                         CERROR("%s: unexpected positive size %d\n",
1771                                obd_name, rc);
1772                         RETURN(-EPROTO);
1773                 }
1774
1775                 if (req->rq_bulk != NULL &&
1776                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1777                         RETURN(-EAGAIN);
1778
1779                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780                     check_write_checksum(&body->oa, peer, client_cksum,
1781                                          body->oa.o_cksum, aa))
1782                         RETURN(-EAGAIN);
1783
1784                 rc = check_write_rcs(req, aa->aa_requested_nob,
1785                                      aa->aa_nio_count, aa->aa_page_count,
1786                                      aa->aa_ppga);
1787                 GOTO(out, rc);
1788         }
1789
1790         /* The rest of this function executes only for OST_READs */
1791
1792         if (req->rq_bulk == NULL) {
1793                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1794                                           RCL_SERVER);
1795                 LASSERT(rc == req->rq_status);
1796         } else {
1797                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1798                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1799         }
1800         if (rc < 0)
1801                 GOTO(out, rc = -EAGAIN);
1802
1803         if (rc > aa->aa_requested_nob) {
1804                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1805                        rc, aa->aa_requested_nob);
1806                 RETURN(-EPROTO);
1807         }
1808
1809         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1810                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1811                        rc, req->rq_bulk->bd_nob_transferred);
1812                 RETURN(-EPROTO);
1813         }
1814
1815         if (req->rq_bulk == NULL) {
1816                 /* short io */
1817                 int nob, pg_count, i = 0;
1818                 unsigned char *buf;
1819
1820                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1821                 pg_count = aa->aa_page_count;
1822                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1823                                                    rc);
1824                 nob = rc;
1825                 while (nob > 0 && pg_count > 0) {
1826                         unsigned char *ptr;
1827                         int count = aa->aa_ppga[i]->count > nob ?
1828                                     nob : aa->aa_ppga[i]->count;
1829
1830                         CDEBUG(D_CACHE, "page %p count %d\n",
1831                                aa->aa_ppga[i]->pg, count);
1832                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1833                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1834                                count);
1835                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1836
1837                         buf += count;
1838                         nob -= count;
1839                         i++;
1840                         pg_count--;
1841                 }
1842         }
1843
1844         if (rc < aa->aa_requested_nob)
1845                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1846
1847         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1848                 static int cksum_counter;
1849                 u32        server_cksum = body->oa.o_cksum;
1850                 char      *via = "";
1851                 char      *router = "";
1852                 enum cksum_types cksum_type;
1853                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1854                         body->oa.o_flags : 0;
1855
1856                 cksum_type = obd_cksum_type_unpack(o_flags);
1857                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1858                                           aa->aa_page_count, aa->aa_ppga,
1859                                           OST_READ, &client_cksum);
1860                 if (rc < 0)
1861                         GOTO(out, rc);
1862
1863                 if (req->rq_bulk != NULL &&
1864                     peer->nid != req->rq_bulk->bd_sender) {
1865                         via = " via ";
1866                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1867                 }
1868
1869                 if (server_cksum != client_cksum) {
1870                         struct ost_body *clbody;
1871                         u32 page_count = aa->aa_page_count;
1872
1873                         clbody = req_capsule_client_get(&req->rq_pill,
1874                                                         &RMF_OST_BODY);
1875                         if (cli->cl_checksum_dump)
1876                                 dump_all_bulk_pages(&clbody->oa, page_count,
1877                                                     aa->aa_ppga, server_cksum,
1878                                                     client_cksum);
1879
1880                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1881                                            "%s%s%s inode "DFID" object "DOSTID
1882                                            " extent [%llu-%llu], client %x, "
1883                                            "server %x, cksum_type %x\n",
1884                                            obd_name,
1885                                            libcfs_nid2str(peer->nid),
1886                                            via, router,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_seq : 0ULL,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_oid : 0,
1891                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1892                                                 clbody->oa.o_parent_ver : 0,
1893                                            POSTID(&body->oa.o_oi),
1894                                            aa->aa_ppga[0]->off,
1895                                            aa->aa_ppga[page_count-1]->off +
1896                                            aa->aa_ppga[page_count-1]->count - 1,
1897                                            client_cksum, server_cksum,
1898                                            cksum_type);
1899                         cksum_counter = 0;
1900                         aa->aa_oa->o_cksum = client_cksum;
1901                         rc = -EAGAIN;
1902                 } else {
1903                         cksum_counter++;
1904                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1905                         rc = 0;
1906                 }
1907         } else if (unlikely(client_cksum)) {
1908                 static int cksum_missed;
1909
1910                 cksum_missed++;
1911                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1912                         CERROR("%s: checksum %u requested from %s but not sent\n",
1913                                obd_name, cksum_missed,
1914                                libcfs_nid2str(peer->nid));
1915         } else {
1916                 rc = 0;
1917         }
1918 out:
1919         if (rc >= 0)
1920                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1921                                      aa->aa_oa, &body->oa);
1922
1923         RETURN(rc);
1924 }
1925
1926 static int osc_brw_redo_request(struct ptlrpc_request *request,
1927                                 struct osc_brw_async_args *aa, int rc)
1928 {
1929         struct ptlrpc_request *new_req;
1930         struct osc_brw_async_args *new_aa;
1931         struct osc_async_page *oap;
1932         ENTRY;
1933
1934         /* The below message is checked in replay-ost-single.sh test_8ae*/
1935         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1936                   "redo for recoverable error %d", rc);
1937
1938         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1939                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1940                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1941                                   aa->aa_ppga, &new_req, 1);
1942         if (rc)
1943                 RETURN(rc);
1944
1945         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1946                 if (oap->oap_request != NULL) {
1947                         LASSERTF(request == oap->oap_request,
1948                                  "request %p != oap_request %p\n",
1949                                  request, oap->oap_request);
1950                         if (oap->oap_interrupted) {
1951                                 ptlrpc_req_finished(new_req);
1952                                 RETURN(-EINTR);
1953                         }
1954                 }
1955         }
1956         /*
1957          * New request takes over pga and oaps from old request.
1958          * Note that copying a list_head doesn't work, need to move it...
1959          */
1960         aa->aa_resends++;
1961         new_req->rq_interpret_reply = request->rq_interpret_reply;
1962         new_req->rq_async_args = request->rq_async_args;
1963         new_req->rq_commit_cb = request->rq_commit_cb;
1964         /* cap resend delay to the current request timeout, this is similar to
1965          * what ptlrpc does (see after_reply()) */
1966         if (aa->aa_resends > new_req->rq_timeout)
1967                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1968         else
1969                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1970         new_req->rq_generation_set = 1;
1971         new_req->rq_import_generation = request->rq_import_generation;
1972
1973         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1974
1975         INIT_LIST_HEAD(&new_aa->aa_oaps);
1976         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1977         INIT_LIST_HEAD(&new_aa->aa_exts);
1978         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1979         new_aa->aa_resends = aa->aa_resends;
1980
1981         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1982                 if (oap->oap_request) {
1983                         ptlrpc_req_finished(oap->oap_request);
1984                         oap->oap_request = ptlrpc_request_addref(new_req);
1985                 }
1986         }
1987
1988         /* XXX: This code will run into problem if we're going to support
1989          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1990          * and wait for all of them to be finished. We should inherit request
1991          * set from old request. */
1992         ptlrpcd_add_req(new_req);
1993
1994         DEBUG_REQ(D_INFO, new_req, "new request");
1995         RETURN(0);
1996 }
1997
1998 /*
1999  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2000  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2001  * fine for our small page arrays and doesn't require allocation.  its an
2002  * insertion sort that swaps elements that are strides apart, shrinking the
2003  * stride down until its '1' and the array is sorted.
2004  */
2005 static void sort_brw_pages(struct brw_page **array, int num)
2006 {
2007         int stride, i, j;
2008         struct brw_page *tmp;
2009
2010         if (num == 1)
2011                 return;
2012         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2013                 ;
2014
2015         do {
2016                 stride /= 3;
2017                 for (i = stride ; i < num ; i++) {
2018                         tmp = array[i];
2019                         j = i;
2020                         while (j >= stride && array[j - stride]->off > tmp->off) {
2021                                 array[j] = array[j - stride];
2022                                 j -= stride;
2023                         }
2024                         array[j] = tmp;
2025                 }
2026         } while (stride > 1);
2027 }
2028
2029 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2030 {
2031         LASSERT(ppga != NULL);
2032         OBD_FREE(ppga, sizeof(*ppga) * count);
2033 }
2034
2035 static int brw_interpret(const struct lu_env *env,
2036                          struct ptlrpc_request *req, void *args, int rc)
2037 {
2038         struct osc_brw_async_args *aa = args;
2039         struct osc_extent *ext;
2040         struct osc_extent *tmp;
2041         struct client_obd *cli = aa->aa_cli;
2042         unsigned long transferred = 0;
2043
2044         ENTRY;
2045
2046         rc = osc_brw_fini_request(req, rc);
2047         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2048         /*
2049          * When server returns -EINPROGRESS, client should always retry
2050          * regardless of the number of times the bulk was resent already.
2051          */
2052         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2053                 if (req->rq_import_generation !=
2054                     req->rq_import->imp_generation) {
2055                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2056                                ""DOSTID", rc = %d.\n",
2057                                req->rq_import->imp_obd->obd_name,
2058                                POSTID(&aa->aa_oa->o_oi), rc);
2059                 } else if (rc == -EINPROGRESS ||
2060                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2061                         rc = osc_brw_redo_request(req, aa, rc);
2062                 } else {
2063                         CERROR("%s: too many resent retries for object: "
2064                                "%llu:%llu, rc = %d.\n",
2065                                req->rq_import->imp_obd->obd_name,
2066                                POSTID(&aa->aa_oa->o_oi), rc);
2067                 }
2068
2069                 if (rc == 0)
2070                         RETURN(0);
2071                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2072                         rc = -EIO;
2073         }
2074
2075         if (rc == 0) {
2076                 struct obdo *oa = aa->aa_oa;
2077                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2078                 unsigned long valid = 0;
2079                 struct cl_object *obj;
2080                 struct osc_async_page *last;
2081
2082                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2083                 obj = osc2cl(last->oap_obj);
2084
2085                 cl_object_attr_lock(obj);
2086                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2087                         attr->cat_blocks = oa->o_blocks;
2088                         valid |= CAT_BLOCKS;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLMTIME) {
2091                         attr->cat_mtime = oa->o_mtime;
2092                         valid |= CAT_MTIME;
2093                 }
2094                 if (oa->o_valid & OBD_MD_FLATIME) {
2095                         attr->cat_atime = oa->o_atime;
2096                         valid |= CAT_ATIME;
2097                 }
2098                 if (oa->o_valid & OBD_MD_FLCTIME) {
2099                         attr->cat_ctime = oa->o_ctime;
2100                         valid |= CAT_CTIME;
2101                 }
2102
2103                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2104                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2105                         loff_t last_off = last->oap_count + last->oap_obj_off +
2106                                 last->oap_page_off;
2107
2108                         /* Change file size if this is an out of quota or
2109                          * direct IO write and it extends the file size */
2110                         if (loi->loi_lvb.lvb_size < last_off) {
2111                                 attr->cat_size = last_off;
2112                                 valid |= CAT_SIZE;
2113                         }
2114                         /* Extend KMS if it's not a lockless write */
2115                         if (loi->loi_kms < last_off &&
2116                             oap2osc_page(last)->ops_srvlock == 0) {
2117                                 attr->cat_kms = last_off;
2118                                 valid |= CAT_KMS;
2119                         }
2120                 }
2121
2122                 if (valid != 0)
2123                         cl_object_attr_update(env, obj, attr, valid);
2124                 cl_object_attr_unlock(obj);
2125         }
2126         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2127         aa->aa_oa = NULL;
2128
2129         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2130                 osc_inc_unstable_pages(req);
2131
2132         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2133                 list_del_init(&ext->oe_link);
2134                 osc_extent_finish(env, ext, 1,
2135                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2136         }
2137         LASSERT(list_empty(&aa->aa_exts));
2138         LASSERT(list_empty(&aa->aa_oaps));
2139
2140         transferred = (req->rq_bulk == NULL ? /* short io */
2141                        aa->aa_requested_nob :
2142                        req->rq_bulk->bd_nob_transferred);
2143
2144         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2145         ptlrpc_lprocfs_brw(req, transferred);
2146
2147         spin_lock(&cli->cl_loi_list_lock);
2148         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2149          * is called so we know whether to go to sync BRWs or wait for more
2150          * RPCs to complete */
2151         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2152                 cli->cl_w_in_flight--;
2153         else
2154                 cli->cl_r_in_flight--;
2155         osc_wake_cache_waiters(cli);
2156         spin_unlock(&cli->cl_loi_list_lock);
2157
2158         osc_io_unplug(env, cli, NULL);
2159         RETURN(rc);
2160 }
2161
2162 static void brw_commit(struct ptlrpc_request *req)
2163 {
2164         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2165          * this called via the rq_commit_cb, I need to ensure
2166          * osc_dec_unstable_pages is still called. Otherwise unstable
2167          * pages may be leaked. */
2168         spin_lock(&req->rq_lock);
2169         if (likely(req->rq_unstable)) {
2170                 req->rq_unstable = 0;
2171                 spin_unlock(&req->rq_lock);
2172
2173                 osc_dec_unstable_pages(req);
2174         } else {
2175                 req->rq_committed = 1;
2176                 spin_unlock(&req->rq_lock);
2177         }
2178 }
2179
2180 /**
2181  * Build an RPC by the list of extent @ext_list. The caller must ensure
2182  * that the total pages in this list are NOT over max pages per RPC.
2183  * Extents in the list must be in OES_RPC state.
2184  */
2185 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2186                   struct list_head *ext_list, int cmd)
2187 {
2188         struct ptlrpc_request           *req = NULL;
2189         struct osc_extent               *ext;
2190         struct brw_page                 **pga = NULL;
2191         struct osc_brw_async_args       *aa = NULL;
2192         struct obdo                     *oa = NULL;
2193         struct osc_async_page           *oap;
2194         struct osc_object               *obj = NULL;
2195         struct cl_req_attr              *crattr = NULL;
2196         loff_t                          starting_offset = OBD_OBJECT_EOF;
2197         loff_t                          ending_offset = 0;
2198         int                             mpflag = 0;
2199         int                             mem_tight = 0;
2200         int                             page_count = 0;
2201         bool                            soft_sync = false;
2202         bool                            interrupted = false;
2203         bool                            ndelay = false;
2204         int                             i;
2205         int                             grant = 0;
2206         int                             rc;
2207         __u32                           layout_version = 0;
2208         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2209         struct ost_body                 *body;
2210         ENTRY;
2211         LASSERT(!list_empty(ext_list));
2212
2213         /* add pages into rpc_list to build BRW rpc */
2214         list_for_each_entry(ext, ext_list, oe_link) {
2215                 LASSERT(ext->oe_state == OES_RPC);
2216                 mem_tight |= ext->oe_memalloc;
2217                 grant += ext->oe_grants;
2218                 page_count += ext->oe_nr_pages;
2219                 layout_version = MAX(layout_version, ext->oe_layout_version);
2220                 if (obj == NULL)
2221                         obj = ext->oe_obj;
2222         }
2223
2224         soft_sync = osc_over_unstable_soft_limit(cli);
2225         if (mem_tight)
2226                 mpflag = cfs_memory_pressure_get_and_set();
2227
2228         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2229         if (pga == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2233         if (oa == NULL)
2234                 GOTO(out, rc = -ENOMEM);
2235
2236         i = 0;
2237         list_for_each_entry(ext, ext_list, oe_link) {
2238                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2239                         if (mem_tight)
2240                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2241                         if (soft_sync)
2242                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2243                         pga[i] = &oap->oap_brw_page;
2244                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2245                         i++;
2246
2247                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2248                         if (starting_offset == OBD_OBJECT_EOF ||
2249                             starting_offset > oap->oap_obj_off)
2250                                 starting_offset = oap->oap_obj_off;
2251                         else
2252                                 LASSERT(oap->oap_page_off == 0);
2253                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2254                                 ending_offset = oap->oap_obj_off +
2255                                                 oap->oap_count;
2256                         else
2257                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2258                                         PAGE_SIZE);
2259                         if (oap->oap_interrupted)
2260                                 interrupted = true;
2261                 }
2262                 if (ext->oe_ndelay)
2263                         ndelay = true;
2264         }
2265
2266         /* first page in the list */
2267         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2268
2269         crattr = &osc_env_info(env)->oti_req_attr;
2270         memset(crattr, 0, sizeof(*crattr));
2271         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2272         crattr->cra_flags = ~0ULL;
2273         crattr->cra_page = oap2cl_page(oap);
2274         crattr->cra_oa = oa;
2275         cl_req_attr_set(env, osc2cl(obj), crattr);
2276
2277         if (cmd == OBD_BRW_WRITE) {
2278                 oa->o_grant_used = grant;
2279                 if (layout_version > 0) {
2280                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2281                                PFID(&oa->o_oi.oi_fid), layout_version);
2282
2283                         oa->o_layout_version = layout_version;
2284                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2285                 }
2286         }
2287
2288         sort_brw_pages(pga, page_count);
2289         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2290         if (rc != 0) {
2291                 CERROR("prep_req failed: %d\n", rc);
2292                 GOTO(out, rc);
2293         }
2294
2295         req->rq_commit_cb = brw_commit;
2296         req->rq_interpret_reply = brw_interpret;
2297         req->rq_memalloc = mem_tight != 0;
2298         oap->oap_request = ptlrpc_request_addref(req);
2299         if (interrupted && !req->rq_intr)
2300                 ptlrpc_mark_interrupted(req);
2301         if (ndelay) {
2302                 req->rq_no_resend = req->rq_no_delay = 1;
2303                 /* probably set a shorter timeout value.
2304                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2305                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2306         }
2307
2308         /* Need to update the timestamps after the request is built in case
2309          * we race with setattr (locally or in queue at OST).  If OST gets
2310          * later setattr before earlier BRW (as determined by the request xid),
2311          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2312          * way to do this in a single call.  bug 10150 */
2313         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2314         crattr->cra_oa = &body->oa;
2315         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2316         cl_req_attr_set(env, osc2cl(obj), crattr);
2317         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2318
2319         aa = ptlrpc_req_async_args(aa, req);
2320         INIT_LIST_HEAD(&aa->aa_oaps);
2321         list_splice_init(&rpc_list, &aa->aa_oaps);
2322         INIT_LIST_HEAD(&aa->aa_exts);
2323         list_splice_init(ext_list, &aa->aa_exts);
2324
2325         spin_lock(&cli->cl_loi_list_lock);
2326         starting_offset >>= PAGE_SHIFT;
2327         if (cmd == OBD_BRW_READ) {
2328                 cli->cl_r_in_flight++;
2329                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2330                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2331                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2332                                       starting_offset + 1);
2333         } else {
2334                 cli->cl_w_in_flight++;
2335                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2336                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2337                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2338                                       starting_offset + 1);
2339         }
2340         spin_unlock(&cli->cl_loi_list_lock);
2341
2342         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2343                   page_count, aa, cli->cl_r_in_flight,
2344                   cli->cl_w_in_flight);
2345         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2346
2347         ptlrpcd_add_req(req);
2348         rc = 0;
2349         EXIT;
2350
2351 out:
2352         if (mem_tight != 0)
2353                 cfs_memory_pressure_restore(mpflag);
2354
2355         if (rc != 0) {
2356                 LASSERT(req == NULL);
2357
2358                 if (oa)
2359                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2360                 if (pga)
2361                         OBD_FREE(pga, sizeof(*pga) * page_count);
2362                 /* this should happen rarely and is pretty bad, it makes the
2363                  * pending list not follow the dirty order */
2364                 while (!list_empty(ext_list)) {
2365                         ext = list_entry(ext_list->next, struct osc_extent,
2366                                          oe_link);
2367                         list_del_init(&ext->oe_link);
2368                         osc_extent_finish(env, ext, 0, rc);
2369                 }
2370         }
2371         RETURN(rc);
2372 }
2373
2374 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2375 {
2376         int set = 0;
2377
2378         LASSERT(lock != NULL);
2379
2380         lock_res_and_lock(lock);
2381
2382         if (lock->l_ast_data == NULL)
2383                 lock->l_ast_data = data;
2384         if (lock->l_ast_data == data)
2385                 set = 1;
2386
2387         unlock_res_and_lock(lock);
2388
2389         return set;
2390 }
2391
2392 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2393                      void *cookie, struct lustre_handle *lockh,
2394                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2395                      int errcode)
2396 {
2397         bool intent = *flags & LDLM_FL_HAS_INTENT;
2398         int rc;
2399         ENTRY;
2400
2401         /* The request was created before ldlm_cli_enqueue call. */
2402         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2403                 struct ldlm_reply *rep;
2404
2405                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2406                 LASSERT(rep != NULL);
2407
2408                 rep->lock_policy_res1 =
2409                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2410                 if (rep->lock_policy_res1)
2411                         errcode = rep->lock_policy_res1;
2412                 if (!speculative)
2413                         *flags |= LDLM_FL_LVB_READY;
2414         } else if (errcode == ELDLM_OK) {
2415                 *flags |= LDLM_FL_LVB_READY;
2416         }
2417
2418         /* Call the update callback. */
2419         rc = (*upcall)(cookie, lockh, errcode);
2420
2421         /* release the reference taken in ldlm_cli_enqueue() */
2422         if (errcode == ELDLM_LOCK_MATCHED)
2423                 errcode = ELDLM_OK;
2424         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2425                 ldlm_lock_decref(lockh, mode);
2426
2427         RETURN(rc);
2428 }
2429
2430 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2431                           void *args, int rc)
2432 {
2433         struct osc_enqueue_args *aa = args;
2434         struct ldlm_lock *lock;
2435         struct lustre_handle *lockh = &aa->oa_lockh;
2436         enum ldlm_mode mode = aa->oa_mode;
2437         struct ost_lvb *lvb = aa->oa_lvb;
2438         __u32 lvb_len = sizeof(*lvb);
2439         __u64 flags = 0;
2440
2441         ENTRY;
2442
2443         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2444          * be valid. */
2445         lock = ldlm_handle2lock(lockh);
2446         LASSERTF(lock != NULL,
2447                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2448                  lockh->cookie, req, aa);
2449
2450         /* Take an additional reference so that a blocking AST that
2451          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2452          * to arrive after an upcall has been executed by
2453          * osc_enqueue_fini(). */
2454         ldlm_lock_addref(lockh, mode);
2455
2456         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2457         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2458
2459         /* Let CP AST to grant the lock first. */
2460         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2461
2462         if (aa->oa_speculative) {
2463                 LASSERT(aa->oa_lvb == NULL);
2464                 LASSERT(aa->oa_flags == NULL);
2465                 aa->oa_flags = &flags;
2466         }
2467
2468         /* Complete obtaining the lock procedure. */
2469         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2470                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2471                                    lockh, rc);
2472         /* Complete osc stuff. */
2473         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2474                               aa->oa_flags, aa->oa_speculative, rc);
2475
2476         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2477
2478         ldlm_lock_decref(lockh, mode);
2479         LDLM_LOCK_PUT(lock);
2480         RETURN(rc);
2481 }
2482
2483 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2484
2485 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2486  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2487  * other synchronous requests, however keeping some locks and trying to obtain
2488  * others may take a considerable amount of time in a case of ost failure; and
2489  * when other sync requests do not get released lock from a client, the client
2490  * is evicted from the cluster -- such scenarious make the life difficult, so
2491  * release locks just after they are obtained. */
2492 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2493                      __u64 *flags, union ldlm_policy_data *policy,
2494                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2495                      void *cookie, struct ldlm_enqueue_info *einfo,
2496                      struct ptlrpc_request_set *rqset, int async,
2497                      bool speculative)
2498 {
2499         struct obd_device *obd = exp->exp_obd;
2500         struct lustre_handle lockh = { 0 };
2501         struct ptlrpc_request *req = NULL;
2502         int intent = *flags & LDLM_FL_HAS_INTENT;
2503         __u64 match_flags = *flags;
2504         enum ldlm_mode mode;
2505         int rc;
2506         ENTRY;
2507
2508         /* Filesystem lock extents are extended to page boundaries so that
2509          * dealing with the page cache is a little smoother.  */
2510         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2511         policy->l_extent.end |= ~PAGE_MASK;
2512
2513         /* Next, search for already existing extent locks that will cover us */
2514         /* If we're trying to read, we also search for an existing PW lock.  The
2515          * VFS and page cache already protect us locally, so lots of readers/
2516          * writers can share a single PW lock.
2517          *
2518          * There are problems with conversion deadlocks, so instead of
2519          * converting a read lock to a write lock, we'll just enqueue a new
2520          * one.
2521          *
2522          * At some point we should cancel the read lock instead of making them
2523          * send us a blocking callback, but there are problems with canceling
2524          * locks out from other users right now, too. */
2525         mode = einfo->ei_mode;
2526         if (einfo->ei_mode == LCK_PR)
2527                 mode |= LCK_PW;
2528         /* Normal lock requests must wait for the LVB to be ready before
2529          * matching a lock; speculative lock requests do not need to,
2530          * because they will not actually use the lock. */
2531         if (!speculative)
2532                 match_flags |= LDLM_FL_LVB_READY;
2533         if (intent != 0)
2534                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2535         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2536                                einfo->ei_type, policy, mode, &lockh, 0);
2537         if (mode) {
2538                 struct ldlm_lock *matched;
2539
2540                 if (*flags & LDLM_FL_TEST_LOCK)
2541                         RETURN(ELDLM_OK);
2542
2543                 matched = ldlm_handle2lock(&lockh);
2544                 if (speculative) {
2545                         /* This DLM lock request is speculative, and does not
2546                          * have an associated IO request. Therefore if there
2547                          * is already a DLM lock, it wll just inform the
2548                          * caller to cancel the request for this stripe.*/
2549                         lock_res_and_lock(matched);
2550                         if (ldlm_extent_equal(&policy->l_extent,
2551                             &matched->l_policy_data.l_extent))
2552                                 rc = -EEXIST;
2553                         else
2554                                 rc = -ECANCELED;
2555                         unlock_res_and_lock(matched);
2556
2557                         ldlm_lock_decref(&lockh, mode);
2558                         LDLM_LOCK_PUT(matched);
2559                         RETURN(rc);
2560                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2561                         *flags |= LDLM_FL_LVB_READY;
2562
2563                         /* We already have a lock, and it's referenced. */
2564                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2565
2566                         ldlm_lock_decref(&lockh, mode);
2567                         LDLM_LOCK_PUT(matched);
2568                         RETURN(ELDLM_OK);
2569                 } else {
2570                         ldlm_lock_decref(&lockh, mode);
2571                         LDLM_LOCK_PUT(matched);
2572                 }
2573         }
2574
2575         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2576                 RETURN(-ENOLCK);
2577
2578         if (intent) {
2579                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2580                                            &RQF_LDLM_ENQUEUE_LVB);
2581                 if (req == NULL)
2582                         RETURN(-ENOMEM);
2583
2584                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2585                 if (rc) {
2586                         ptlrpc_request_free(req);
2587                         RETURN(rc);
2588                 }
2589
2590                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2591                                      sizeof *lvb);
2592                 ptlrpc_request_set_replen(req);
2593         }
2594
2595         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2596         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2597
2598         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2599                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2600         if (async) {
2601                 if (!rc) {
2602                         struct osc_enqueue_args *aa;
2603                         aa = ptlrpc_req_async_args(aa, req);
2604                         aa->oa_exp         = exp;
2605                         aa->oa_mode        = einfo->ei_mode;
2606                         aa->oa_type        = einfo->ei_type;
2607                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2608                         aa->oa_upcall      = upcall;
2609                         aa->oa_cookie      = cookie;
2610                         aa->oa_speculative = speculative;
2611                         if (!speculative) {
2612                                 aa->oa_flags  = flags;
2613                                 aa->oa_lvb    = lvb;
2614                         } else {
2615                                 /* speculative locks are essentially to enqueue
2616                                  * a DLM lock  in advance, so we don't care
2617                                  * about the result of the enqueue. */
2618                                 aa->oa_lvb    = NULL;
2619                                 aa->oa_flags  = NULL;
2620                         }
2621
2622                         req->rq_interpret_reply = osc_enqueue_interpret;
2623                         if (rqset == PTLRPCD_SET)
2624                                 ptlrpcd_add_req(req);
2625                         else
2626                                 ptlrpc_set_add_req(rqset, req);
2627                 } else if (intent) {
2628                         ptlrpc_req_finished(req);
2629                 }
2630                 RETURN(rc);
2631         }
2632
2633         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2634                               flags, speculative, rc);
2635         if (intent)
2636                 ptlrpc_req_finished(req);
2637
2638         RETURN(rc);
2639 }
2640
2641 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2642                    enum ldlm_type type, union ldlm_policy_data *policy,
2643                    enum ldlm_mode mode, __u64 *flags, void *data,
2644                    struct lustre_handle *lockh, int unref)
2645 {
2646         struct obd_device *obd = exp->exp_obd;
2647         __u64 lflags = *flags;
2648         enum ldlm_mode rc;
2649         ENTRY;
2650
2651         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2652                 RETURN(-EIO);
2653
2654         /* Filesystem lock extents are extended to page boundaries so that
2655          * dealing with the page cache is a little smoother */
2656         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2657         policy->l_extent.end |= ~PAGE_MASK;
2658
2659         /* Next, search for already existing extent locks that will cover us */
2660         /* If we're trying to read, we also search for an existing PW lock.  The
2661          * VFS and page cache already protect us locally, so lots of readers/
2662          * writers can share a single PW lock. */
2663         rc = mode;
2664         if (mode == LCK_PR)
2665                 rc |= LCK_PW;
2666         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2667                              res_id, type, policy, rc, lockh, unref);
2668         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2669                 RETURN(rc);
2670
2671         if (data != NULL) {
2672                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2673
2674                 LASSERT(lock != NULL);
2675                 if (!osc_set_lock_data(lock, data)) {
2676                         ldlm_lock_decref(lockh, rc);
2677                         rc = 0;
2678                 }
2679                 LDLM_LOCK_PUT(lock);
2680         }
2681         RETURN(rc);
2682 }
2683
2684 static int osc_statfs_interpret(const struct lu_env *env,
2685                                 struct ptlrpc_request *req, void *args, int rc)
2686 {
2687         struct osc_async_args *aa = args;
2688         struct obd_statfs *msfs;
2689
2690         ENTRY;
2691         if (rc == -EBADR)
2692                 /*
2693                  * The request has in fact never been sent due to issues at
2694                  * a higher level (LOV).  Exit immediately since the caller
2695                  * is aware of the problem and takes care of the clean up.
2696                  */
2697                 RETURN(rc);
2698
2699         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2700             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2701                 GOTO(out, rc = 0);
2702
2703         if (rc != 0)
2704                 GOTO(out, rc);
2705
2706         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2707         if (msfs == NULL)
2708                 GOTO(out, rc = -EPROTO);
2709
2710         *aa->aa_oi->oi_osfs = *msfs;
2711 out:
2712         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2713
2714         RETURN(rc);
2715 }
2716
2717 static int osc_statfs_async(struct obd_export *exp,
2718                             struct obd_info *oinfo, time64_t max_age,
2719                             struct ptlrpc_request_set *rqset)
2720 {
2721         struct obd_device     *obd = class_exp2obd(exp);
2722         struct ptlrpc_request *req;
2723         struct osc_async_args *aa;
2724         int rc;
2725         ENTRY;
2726
2727         if (obd->obd_osfs_age >= max_age) {
2728                 CDEBUG(D_SUPER,
2729                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2730                        obd->obd_name, &obd->obd_osfs,
2731                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2732                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2733                 spin_lock(&obd->obd_osfs_lock);
2734                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2735                 spin_unlock(&obd->obd_osfs_lock);
2736                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2737                 if (oinfo->oi_cb_up)
2738                         oinfo->oi_cb_up(oinfo, 0);
2739
2740                 RETURN(0);
2741         }
2742
2743         /* We could possibly pass max_age in the request (as an absolute
2744          * timestamp or a "seconds.usec ago") so the target can avoid doing
2745          * extra calls into the filesystem if that isn't necessary (e.g.
2746          * during mount that would help a bit).  Having relative timestamps
2747          * is not so great if request processing is slow, while absolute
2748          * timestamps are not ideal because they need time synchronization. */
2749         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2750         if (req == NULL)
2751                 RETURN(-ENOMEM);
2752
2753         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2754         if (rc) {
2755                 ptlrpc_request_free(req);
2756                 RETURN(rc);
2757         }
2758         ptlrpc_request_set_replen(req);
2759         req->rq_request_portal = OST_CREATE_PORTAL;
2760         ptlrpc_at_set_req_timeout(req);
2761
2762         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2763                 /* procfs requests not want stat in wait for avoid deadlock */
2764                 req->rq_no_resend = 1;
2765                 req->rq_no_delay = 1;
2766         }
2767
2768         req->rq_interpret_reply = osc_statfs_interpret;
2769         aa = ptlrpc_req_async_args(aa, req);
2770         aa->aa_oi = oinfo;
2771
2772         ptlrpc_set_add_req(rqset, req);
2773         RETURN(0);
2774 }
2775
2776 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2777                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2778 {
2779         struct obd_device     *obd = class_exp2obd(exp);
2780         struct obd_statfs     *msfs;
2781         struct ptlrpc_request *req;
2782         struct obd_import     *imp = NULL;
2783         int rc;
2784         ENTRY;
2785
2786
2787         /*Since the request might also come from lprocfs, so we need
2788          *sync this with client_disconnect_export Bug15684*/
2789         down_read(&obd->u.cli.cl_sem);
2790         if (obd->u.cli.cl_import)
2791                 imp = class_import_get(obd->u.cli.cl_import);
2792         up_read(&obd->u.cli.cl_sem);
2793         if (!imp)
2794                 RETURN(-ENODEV);
2795
2796         /* We could possibly pass max_age in the request (as an absolute
2797          * timestamp or a "seconds.usec ago") so the target can avoid doing
2798          * extra calls into the filesystem if that isn't necessary (e.g.
2799          * during mount that would help a bit).  Having relative timestamps
2800          * is not so great if request processing is slow, while absolute
2801          * timestamps are not ideal because they need time synchronization. */
2802         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2803
2804         class_import_put(imp);
2805
2806         if (req == NULL)
2807                 RETURN(-ENOMEM);
2808
2809         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2810         if (rc) {
2811                 ptlrpc_request_free(req);
2812                 RETURN(rc);
2813         }
2814         ptlrpc_request_set_replen(req);
2815         req->rq_request_portal = OST_CREATE_PORTAL;
2816         ptlrpc_at_set_req_timeout(req);
2817
2818         if (flags & OBD_STATFS_NODELAY) {
2819                 /* procfs requests not want stat in wait for avoid deadlock */
2820                 req->rq_no_resend = 1;
2821                 req->rq_no_delay = 1;
2822         }
2823
2824         rc = ptlrpc_queue_wait(req);
2825         if (rc)
2826                 GOTO(out, rc);
2827
2828         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2829         if (msfs == NULL)
2830                 GOTO(out, rc = -EPROTO);
2831
2832         *osfs = *msfs;
2833
2834         EXIT;
2835 out:
2836         ptlrpc_req_finished(req);
2837         return rc;
2838 }
2839
2840 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2841                          void *karg, void __user *uarg)
2842 {
2843         struct obd_device *obd = exp->exp_obd;
2844         struct obd_ioctl_data *data = karg;
2845         int rc = 0;
2846
2847         ENTRY;
2848         if (!try_module_get(THIS_MODULE)) {
2849                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2850                        module_name(THIS_MODULE));
2851                 return -EINVAL;
2852         }
2853         switch (cmd) {
2854         case OBD_IOC_CLIENT_RECOVER:
2855                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2856                                            data->ioc_inlbuf1, 0);
2857                 if (rc > 0)
2858                         rc = 0;
2859                 break;
2860         case IOC_OSC_SET_ACTIVE:
2861                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2862                                               data->ioc_offset);
2863                 break;
2864         default:
2865                 rc = -ENOTTY;
2866                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2867                        obd->obd_name, cmd, current_comm(), rc);
2868                 break;
2869         }
2870
2871         module_put(THIS_MODULE);
2872         return rc;
2873 }
2874
2875 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2876                        u32 keylen, void *key, u32 vallen, void *val,
2877                        struct ptlrpc_request_set *set)
2878 {
2879         struct ptlrpc_request *req;
2880         struct obd_device     *obd = exp->exp_obd;
2881         struct obd_import     *imp = class_exp2cliimp(exp);
2882         char                  *tmp;
2883         int                    rc;
2884         ENTRY;
2885
2886         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2887
2888         if (KEY_IS(KEY_CHECKSUM)) {
2889                 if (vallen != sizeof(int))
2890                         RETURN(-EINVAL);
2891                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2892                 RETURN(0);
2893         }
2894
2895         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2896                 sptlrpc_conf_client_adapt(obd);
2897                 RETURN(0);
2898         }
2899
2900         if (KEY_IS(KEY_FLUSH_CTX)) {
2901                 sptlrpc_import_flush_my_ctx(imp);
2902                 RETURN(0);
2903         }
2904
2905         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2906                 struct client_obd *cli = &obd->u.cli;
2907                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2908                 long target = *(long *)val;
2909
2910                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2911                 *(long *)val -= nr;
2912                 RETURN(0);
2913         }
2914
2915         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2916                 RETURN(-EINVAL);
2917
2918         /* We pass all other commands directly to OST. Since nobody calls osc
2919            methods directly and everybody is supposed to go through LOV, we
2920            assume lov checked invalid values for us.
2921            The only recognised values so far are evict_by_nid and mds_conn.
2922            Even if something bad goes through, we'd get a -EINVAL from OST
2923            anyway. */
2924
2925         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2926                                                 &RQF_OST_SET_GRANT_INFO :
2927                                                 &RQF_OBD_SET_INFO);
2928         if (req == NULL)
2929                 RETURN(-ENOMEM);
2930
2931         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2932                              RCL_CLIENT, keylen);
2933         if (!KEY_IS(KEY_GRANT_SHRINK))
2934                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2935                                      RCL_CLIENT, vallen);
2936         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2937         if (rc) {
2938                 ptlrpc_request_free(req);
2939                 RETURN(rc);
2940         }
2941
2942         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2943         memcpy(tmp, key, keylen);
2944         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2945                                                         &RMF_OST_BODY :
2946                                                         &RMF_SETINFO_VAL);
2947         memcpy(tmp, val, vallen);
2948
2949         if (KEY_IS(KEY_GRANT_SHRINK)) {
2950                 struct osc_grant_args *aa;
2951                 struct obdo *oa;
2952
2953                 aa = ptlrpc_req_async_args(aa, req);
2954                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2955                 if (!oa) {
2956                         ptlrpc_req_finished(req);
2957                         RETURN(-ENOMEM);
2958                 }
2959                 *oa = ((struct ost_body *)val)->oa;
2960                 aa->aa_oa = oa;
2961                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2962         }
2963
2964         ptlrpc_request_set_replen(req);
2965         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2966                 LASSERT(set != NULL);
2967                 ptlrpc_set_add_req(set, req);
2968                 ptlrpc_check_set(NULL, set);
2969         } else {
2970                 ptlrpcd_add_req(req);
2971         }
2972
2973         RETURN(0);
2974 }
2975 EXPORT_SYMBOL(osc_set_info_async);
2976
2977 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2978                   struct obd_device *obd, struct obd_uuid *cluuid,
2979                   struct obd_connect_data *data, void *localdata)
2980 {
2981         struct client_obd *cli = &obd->u.cli;
2982
2983         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2984                 long lost_grant;
2985                 long grant;
2986
2987                 spin_lock(&cli->cl_loi_list_lock);
2988                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2989                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2990                         /* restore ocd_grant_blkbits as client page bits */
2991                         data->ocd_grant_blkbits = PAGE_SHIFT;
2992                         grant += cli->cl_dirty_grant;
2993                 } else {
2994                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2995                 }
2996                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2997                 lost_grant = cli->cl_lost_grant;
2998                 cli->cl_lost_grant = 0;
2999                 spin_unlock(&cli->cl_loi_list_lock);
3000
3001                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3002                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3003                        data->ocd_version, data->ocd_grant, lost_grant);
3004         }
3005
3006         RETURN(0);
3007 }
3008 EXPORT_SYMBOL(osc_reconnect);
3009
3010 int osc_disconnect(struct obd_export *exp)
3011 {
3012         struct obd_device *obd = class_exp2obd(exp);
3013         int rc;
3014
3015         rc = client_disconnect_export(exp);
3016         /**
3017          * Initially we put del_shrink_grant before disconnect_export, but it
3018          * causes the following problem if setup (connect) and cleanup
3019          * (disconnect) are tangled together.
3020          *      connect p1                     disconnect p2
3021          *   ptlrpc_connect_import
3022          *     ...............               class_manual_cleanup
3023          *                                     osc_disconnect
3024          *                                     del_shrink_grant
3025          *   ptlrpc_connect_interrupt
3026          *     osc_init_grant
3027          *   add this client to shrink list
3028          *                                      cleanup_osc
3029          * Bang! grant shrink thread trigger the shrink. BUG18662
3030          */
3031         osc_del_grant_list(&obd->u.cli);
3032         return rc;
3033 }
3034 EXPORT_SYMBOL(osc_disconnect);
3035
3036 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3037                                  struct hlist_node *hnode, void *arg)
3038 {
3039         struct lu_env *env = arg;
3040         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3041         struct ldlm_lock *lock;
3042         struct osc_object *osc = NULL;
3043         ENTRY;
3044
3045         lock_res(res);
3046         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3047                 if (lock->l_ast_data != NULL && osc == NULL) {
3048                         osc = lock->l_ast_data;
3049                         cl_object_get(osc2cl(osc));
3050                 }
3051
3052                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3053                  * by the 2nd round of ldlm_namespace_clean() call in
3054                  * osc_import_event(). */
3055                 ldlm_clear_cleaned(lock);
3056         }
3057         unlock_res(res);
3058
3059         if (osc != NULL) {
3060                 osc_object_invalidate(env, osc);
3061                 cl_object_put(env, osc2cl(osc));
3062         }
3063
3064         RETURN(0);
3065 }
3066 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3067
3068 static int osc_import_event(struct obd_device *obd,
3069                             struct obd_import *imp,
3070                             enum obd_import_event event)
3071 {
3072         struct client_obd *cli;
3073         int rc = 0;
3074
3075         ENTRY;
3076         LASSERT(imp->imp_obd == obd);
3077
3078         switch (event) {
3079         case IMP_EVENT_DISCON: {
3080                 cli = &obd->u.cli;
3081                 spin_lock(&cli->cl_loi_list_lock);
3082                 cli->cl_avail_grant = 0;
3083                 cli->cl_lost_grant = 0;
3084                 spin_unlock(&cli->cl_loi_list_lock);
3085                 break;
3086         }
3087         case IMP_EVENT_INACTIVE: {
3088                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3089                 break;
3090         }
3091         case IMP_EVENT_INVALIDATE: {
3092                 struct ldlm_namespace *ns = obd->obd_namespace;
3093                 struct lu_env         *env;
3094                 __u16                  refcheck;
3095
3096                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3097
3098                 env = cl_env_get(&refcheck);
3099                 if (!IS_ERR(env)) {
3100                         osc_io_unplug(env, &obd->u.cli, NULL);
3101
3102                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3103                                                  osc_ldlm_resource_invalidate,
3104                                                  env, 0);
3105                         cl_env_put(env, &refcheck);
3106
3107                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3108                 } else
3109                         rc = PTR_ERR(env);
3110                 break;
3111         }
3112         case IMP_EVENT_ACTIVE: {
3113                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3114                 break;
3115         }
3116         case IMP_EVENT_OCD: {
3117                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3118
3119                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3120                         osc_init_grant(&obd->u.cli, ocd);
3121
3122                 /* See bug 7198 */
3123                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3124                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3125
3126                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3127                 break;
3128         }
3129         case IMP_EVENT_DEACTIVATE: {
3130                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3131                 break;
3132         }
3133         case IMP_EVENT_ACTIVATE: {
3134                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3135                 break;
3136         }
3137         default:
3138                 CERROR("Unknown import event %d\n", event);
3139                 LBUG();
3140         }
3141         RETURN(rc);
3142 }
3143
3144 /**
3145  * Determine whether the lock can be canceled before replaying the lock
3146  * during recovery, see bug16774 for detailed information.
3147  *
3148  * \retval zero the lock can't be canceled
3149  * \retval other ok to cancel
3150  */
3151 static int osc_cancel_weight(struct ldlm_lock *lock)
3152 {
3153         /*
3154          * Cancel all unused and granted extent lock.
3155          */
3156         if (lock->l_resource->lr_type == LDLM_EXTENT &&