Whamcloud - gitweb
LU-12072 lov: remove KEY_CACHE_SET to simplify the code
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235                 sa = ptlrpc_req_async_args(req);
236                 sa->sa_oa = oa;
237                 sa->sa_upcall = upcall;
238                 sa->sa_cookie = cookie;
239
240                 if (rqset == PTLRPCD_SET)
241                         ptlrpcd_add_req(req);
242                 else
243                         ptlrpc_set_add_req(rqset, req);
244         }
245
246         RETURN(0);
247 }
248
249 static int osc_ladvise_interpret(const struct lu_env *env,
250                                  struct ptlrpc_request *req,
251                                  void *arg, int rc)
252 {
253         struct osc_ladvise_args *la = arg;
254         struct ost_body *body;
255         ENTRY;
256
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261         if (body == NULL)
262                 GOTO(out, rc = -EPROTO);
263
264         *la->la_oa = body->oa;
265 out:
266         rc = la->la_upcall(la->la_cookie, rc);
267         RETURN(rc);
268 }
269
270 /**
271  * If rqset is NULL, do not wait for response. Upcall and cookie could also
272  * be NULL in this case
273  */
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275                      struct ladvise_hdr *ladvise_hdr,
276                      obd_enqueue_update_f upcall, void *cookie,
277                      struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct ost_body         *body;
281         struct osc_ladvise_args *la;
282         int                      rc;
283         struct lu_ladvise       *req_ladvise;
284         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
285         int                      num_advise = ladvise_hdr->lah_count;
286         struct ladvise_hdr      *req_ladvise_hdr;
287         ENTRY;
288
289         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
290         if (req == NULL)
291                 RETURN(-ENOMEM);
292
293         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294                              num_advise * sizeof(*ladvise));
295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296         if (rc != 0) {
297                 ptlrpc_request_free(req);
298                 RETURN(rc);
299         }
300         req->rq_request_portal = OST_IO_PORTAL;
301         ptlrpc_at_set_req_timeout(req);
302
303         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304         LASSERT(body);
305         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306                              oa);
307
308         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309                                                  &RMF_OST_LADVISE_HDR);
310         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311
312         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314         ptlrpc_request_set_replen(req);
315
316         if (rqset == NULL) {
317                 /* Do not wait for response. */
318                 ptlrpcd_add_req(req);
319                 RETURN(0);
320         }
321
322         req->rq_interpret_reply = osc_ladvise_interpret;
323         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324         la = ptlrpc_req_async_args(req);
325         la->la_oa = oa;
326         la->la_upcall = upcall;
327         la->la_cookie = cookie;
328
329         if (rqset == PTLRPCD_SET)
330                 ptlrpcd_add_req(req);
331         else
332                 ptlrpc_set_add_req(rqset, req);
333
334         RETURN(0);
335 }
336
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
338                       struct obdo *oa)
339 {
340         struct ptlrpc_request *req;
341         struct ost_body       *body;
342         int                    rc;
343         ENTRY;
344
345         LASSERT(oa != NULL);
346         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
348
349         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
350         if (req == NULL)
351                 GOTO(out, rc = -ENOMEM);
352
353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
354         if (rc) {
355                 ptlrpc_request_free(req);
356                 GOTO(out, rc);
357         }
358
359         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
360         LASSERT(body);
361
362         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
363
364         ptlrpc_request_set_replen(req);
365
366         rc = ptlrpc_queue_wait(req);
367         if (rc)
368                 GOTO(out_req, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out_req, rc = -EPROTO);
373
374         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
376
377         oa->o_blksize = cli_brw_size(exp->exp_obd);
378         oa->o_valid |= OBD_MD_FLBLKSZ;
379
380         CDEBUG(D_HA, "transno: %lld\n",
381                lustre_msg_get_transno(req->rq_repmsg));
382 out_req:
383         ptlrpc_req_finished(req);
384 out:
385         RETURN(rc);
386 }
387
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389                    obd_enqueue_update_f upcall, void *cookie)
390 {
391         struct ptlrpc_request *req;
392         struct osc_setattr_args *sa;
393         struct obd_import *imp = class_exp2cliimp(exp);
394         struct ost_body *body;
395         int rc;
396
397         ENTRY;
398
399         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
404         if (rc < 0) {
405                 ptlrpc_request_free(req);
406                 RETURN(rc);
407         }
408
409         osc_set_io_portal(req);
410
411         ptlrpc_at_set_req_timeout(req);
412
413         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414
415         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
416
417         ptlrpc_request_set_replen(req);
418
419         req->rq_interpret_reply = osc_setattr_interpret;
420         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421         sa = ptlrpc_req_async_args(req);
422         sa->sa_oa = oa;
423         sa->sa_upcall = upcall;
424         sa->sa_cookie = cookie;
425
426         ptlrpcd_add_req(req);
427
428         RETURN(0);
429 }
430 EXPORT_SYMBOL(osc_punch_send);
431
432 static int osc_sync_interpret(const struct lu_env *env,
433                               struct ptlrpc_request *req, void *args, int rc)
434 {
435         struct osc_fsync_args *fa = args;
436         struct ost_body *body;
437         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438         unsigned long valid = 0;
439         struct cl_object *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *args, int rc)
551 {
552         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
553
554         atomic_dec(&cli->cl_destroy_in_flight);
555         wake_up(&cli->cl_destroy_waitq);
556
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676                 unsigned long undirty;
677
678                 nrpages = cli->cl_max_pages_per_rpc;
679                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681                 undirty = nrpages << PAGE_SHIFT;
682                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
683                                  GRANT_PARAM)) {
684                         int nrextents;
685
686                         /* take extent tax into account when asking for more
687                          * grant space */
688                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
689                                      cli->cl_max_extent_pages;
690                         undirty += nrextents * cli->cl_grant_extent_tax;
691                 }
692                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693                  * to add extent tax, etc.
694                  */
695                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
697         }
698         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699         oa->o_dropped = cli->cl_lost_grant;
700         cli->cl_lost_grant = 0;
701         spin_unlock(&cli->cl_loi_list_lock);
702         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 }
705
706 void osc_update_next_shrink(struct client_obd *cli)
707 {
708         cli->cl_next_shrink_grant = ktime_get_seconds() +
709                                     cli->cl_grant_shrink_interval;
710
711         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712                cli->cl_next_shrink_grant);
713 }
714
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
716 {
717         spin_lock(&cli->cl_loi_list_lock);
718         cli->cl_avail_grant += grant;
719         spin_unlock(&cli->cl_loi_list_lock);
720 }
721
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 {
724         if (body->oa.o_valid & OBD_MD_FLGRANT) {
725                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726                 __osc_update_grant(cli, body->oa.o_grant);
727         }
728 }
729
730 /**
731  * grant thread data for shrinking space.
732  */
733 struct grant_thread_data {
734         struct list_head        gtd_clients;
735         struct mutex            gtd_mutex;
736         unsigned long           gtd_stopped:1;
737 };
738 static struct grant_thread_data client_gtd;
739
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741                                       struct ptlrpc_request *req,
742                                       void *args, int rc)
743 {
744         struct osc_grant_args *aa = args;
745         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746         struct ost_body *body;
747
748         if (rc != 0) {
749                 __osc_update_grant(cli, aa->aa_oa->o_grant);
750                 GOTO(out, rc);
751         }
752
753         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754         LASSERT(body);
755         osc_update_grant(cli, body);
756 out:
757         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
758
759         return rc;
760 }
761
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
763 {
764         spin_lock(&cli->cl_loi_list_lock);
765         oa->o_grant = cli->cl_avail_grant / 4;
766         cli->cl_avail_grant -= oa->o_grant;
767         spin_unlock(&cli->cl_loi_list_lock);
768         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769                 oa->o_valid |= OBD_MD_FLFLAGS;
770                 oa->o_flags = 0;
771         }
772         oa->o_flags |= OBD_FL_SHRINK_GRANT;
773         osc_update_next_shrink(cli);
774 }
775
776 /* Shrink the current grant, either from some large amount to enough for a
777  * full set of in-flight RPCs, or if we have already shrunk to that limit
778  * then to enough for a single RPC.  This avoids keeping more grant than
779  * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
781 {
782         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
784
785         spin_lock(&cli->cl_loi_list_lock);
786         if (cli->cl_avail_grant <= target_bytes)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788         spin_unlock(&cli->cl_loi_list_lock);
789
790         return osc_shrink_grant_to_target(cli, target_bytes);
791 }
792
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
794 {
795         int                     rc = 0;
796         struct ost_body        *body;
797         ENTRY;
798
799         spin_lock(&cli->cl_loi_list_lock);
800         /* Don't shrink if we are already above or below the desired limit
801          * We don't want to shrink below a single RPC, as that will negatively
802          * impact block allocation and long-term performance. */
803         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
805
806         if (target_bytes >= cli->cl_avail_grant) {
807                 spin_unlock(&cli->cl_loi_list_lock);
808                 RETURN(0);
809         }
810         spin_unlock(&cli->cl_loi_list_lock);
811
812         OBD_ALLOC_PTR(body);
813         if (!body)
814                 RETURN(-ENOMEM);
815
816         osc_announce_cached(cli, &body->oa, 0);
817
818         spin_lock(&cli->cl_loi_list_lock);
819         if (target_bytes >= cli->cl_avail_grant) {
820                 /* available grant has changed since target calculation */
821                 spin_unlock(&cli->cl_loi_list_lock);
822                 GOTO(out_free, rc = 0);
823         }
824         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825         cli->cl_avail_grant = target_bytes;
826         spin_unlock(&cli->cl_loi_list_lock);
827         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828                 body->oa.o_valid |= OBD_MD_FLFLAGS;
829                 body->oa.o_flags = 0;
830         }
831         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832         osc_update_next_shrink(cli);
833
834         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836                                 sizeof(*body), body, NULL);
837         if (rc != 0)
838                 __osc_update_grant(cli, body->oa.o_grant);
839 out_free:
840         OBD_FREE_PTR(body);
841         RETURN(rc);
842 }
843
844 static int osc_should_shrink_grant(struct client_obd *client)
845 {
846         time64_t next_shrink = client->cl_next_shrink_grant;
847
848         if (client->cl_import == NULL)
849                 return 0;
850
851         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852              OBD_CONNECT_GRANT_SHRINK) == 0)
853                 return 0;
854
855         if (ktime_get_seconds() >= next_shrink - 5) {
856                 /* Get the current RPC size directly, instead of going via:
857                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858                  * Keep comment here so that it can be found by searching. */
859                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
860
861                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862                     client->cl_avail_grant > brw_size)
863                         return 1;
864                 else
865                         osc_update_next_shrink(client);
866         }
867         return 0;
868 }
869
870 #define GRANT_SHRINK_RPC_BATCH  100
871
872 static struct delayed_work work;
873
874 static void osc_grant_work_handler(struct work_struct *data)
875 {
876         struct client_obd *cli;
877         int rpc_sent;
878         bool init_next_shrink = true;
879         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
880
881         rpc_sent = 0;
882         mutex_lock(&client_gtd.gtd_mutex);
883         list_for_each_entry(cli, &client_gtd.gtd_clients,
884                             cl_grant_chain) {
885                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886                     osc_should_shrink_grant(cli)) {
887                         osc_shrink_grant(cli);
888                         rpc_sent++;
889                 }
890
891                 if (!init_next_shrink) {
892                         if (cli->cl_next_shrink_grant < next_shrink &&
893                             cli->cl_next_shrink_grant > ktime_get_seconds())
894                                 next_shrink = cli->cl_next_shrink_grant;
895                 } else {
896                         init_next_shrink = false;
897                         next_shrink = cli->cl_next_shrink_grant;
898                 }
899         }
900         mutex_unlock(&client_gtd.gtd_mutex);
901
902         if (client_gtd.gtd_stopped == 1)
903                 return;
904
905         if (next_shrink > ktime_get_seconds())
906                 schedule_delayed_work(&work, msecs_to_jiffies(
907                                         (next_shrink - ktime_get_seconds()) *
908                                         MSEC_PER_SEC));
909         else
910                 schedule_work(&work.work);
911 }
912
913 void osc_schedule_grant_work(void)
914 {
915         cancel_delayed_work_sync(&work);
916         schedule_work(&work.work);
917 }
918
919 /**
920  * Start grant thread for returing grant to server for idle clients.
921  */
922 static int osc_start_grant_work(void)
923 {
924         client_gtd.gtd_stopped = 0;
925         mutex_init(&client_gtd.gtd_mutex);
926         INIT_LIST_HEAD(&client_gtd.gtd_clients);
927
928         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
929         schedule_work(&work.work);
930
931         return 0;
932 }
933
934 static void osc_stop_grant_work(void)
935 {
936         client_gtd.gtd_stopped = 1;
937         cancel_delayed_work_sync(&work);
938 }
939
940 static void osc_add_grant_list(struct client_obd *client)
941 {
942         mutex_lock(&client_gtd.gtd_mutex);
943         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
944         mutex_unlock(&client_gtd.gtd_mutex);
945 }
946
947 static void osc_del_grant_list(struct client_obd *client)
948 {
949         if (list_empty(&client->cl_grant_chain))
950                 return;
951
952         mutex_lock(&client_gtd.gtd_mutex);
953         list_del_init(&client->cl_grant_chain);
954         mutex_unlock(&client_gtd.gtd_mutex);
955 }
956
957 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
958 {
959         /*
960          * ocd_grant is the total grant amount we're expect to hold: if we've
961          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
962          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
963          * dirty.
964          *
965          * race is tolerable here: if we're evicted, but imp_state already
966          * left EVICTED state, then cl_dirty_pages must be 0 already.
967          */
968         spin_lock(&cli->cl_loi_list_lock);
969         cli->cl_avail_grant = ocd->ocd_grant;
970         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
971                 cli->cl_avail_grant -= cli->cl_reserved_grant;
972                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
973                         cli->cl_avail_grant -= cli->cl_dirty_grant;
974                 else
975                         cli->cl_avail_grant -=
976                                         cli->cl_dirty_pages << PAGE_SHIFT;
977         }
978
979         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
980                 u64 size;
981                 int chunk_mask;
982
983                 /* overhead for each extent insertion */
984                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
985                 /* determine the appropriate chunk size used by osc_extent. */
986                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
987                                           ocd->ocd_grant_blkbits);
988                 /* max_pages_per_rpc must be chunk aligned */
989                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
990                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
991                                              ~chunk_mask) & chunk_mask;
992                 /* determine maximum extent size, in #pages */
993                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
994                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
995                 if (cli->cl_max_extent_pages == 0)
996                         cli->cl_max_extent_pages = 1;
997         } else {
998                 cli->cl_grant_extent_tax = 0;
999                 cli->cl_chunkbits = PAGE_SHIFT;
1000                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1001         }
1002         spin_unlock(&cli->cl_loi_list_lock);
1003
1004         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1005                 "chunk bits: %d cl_max_extent_pages: %d\n",
1006                 cli_name(cli),
1007                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1008                 cli->cl_max_extent_pages);
1009
1010         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1011                 osc_add_grant_list(cli);
1012 }
1013 EXPORT_SYMBOL(osc_init_grant);
1014
1015 /* We assume that the reason this OSC got a short read is because it read
1016  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1017  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1018  * this stripe never got written at or beyond this stripe offset yet. */
1019 static void handle_short_read(int nob_read, size_t page_count,
1020                               struct brw_page **pga)
1021 {
1022         char *ptr;
1023         int i = 0;
1024
1025         /* skip bytes read OK */
1026         while (nob_read > 0) {
1027                 LASSERT (page_count > 0);
1028
1029                 if (pga[i]->count > nob_read) {
1030                         /* EOF inside this page */
1031                         ptr = kmap(pga[i]->pg) +
1032                                 (pga[i]->off & ~PAGE_MASK);
1033                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1034                         kunmap(pga[i]->pg);
1035                         page_count--;
1036                         i++;
1037                         break;
1038                 }
1039
1040                 nob_read -= pga[i]->count;
1041                 page_count--;
1042                 i++;
1043         }
1044
1045         /* zero remaining pages */
1046         while (page_count-- > 0) {
1047                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1048                 memset(ptr, 0, pga[i]->count);
1049                 kunmap(pga[i]->pg);
1050                 i++;
1051         }
1052 }
1053
1054 static int check_write_rcs(struct ptlrpc_request *req,
1055                            int requested_nob, int niocount,
1056                            size_t page_count, struct brw_page **pga)
1057 {
1058         int     i;
1059         __u32   *remote_rcs;
1060
1061         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1062                                                   sizeof(*remote_rcs) *
1063                                                   niocount);
1064         if (remote_rcs == NULL) {
1065                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1066                 return(-EPROTO);
1067         }
1068
1069         /* return error if any niobuf was in error */
1070         for (i = 0; i < niocount; i++) {
1071                 if ((int)remote_rcs[i] < 0)
1072                         return(remote_rcs[i]);
1073
1074                 if (remote_rcs[i] != 0) {
1075                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1076                                 i, remote_rcs[i], req);
1077                         return(-EPROTO);
1078                 }
1079         }
1080         if (req->rq_bulk != NULL &&
1081             req->rq_bulk->bd_nob_transferred != requested_nob) {
1082                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1083                        req->rq_bulk->bd_nob_transferred, requested_nob);
1084                 return(-EPROTO);
1085         }
1086
1087         return (0);
1088 }
1089
1090 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1091 {
1092         if (p1->flag != p2->flag) {
1093                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1094                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1095                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1096
1097                 /* warn if we try to combine flags that we don't know to be
1098                  * safe to combine */
1099                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1100                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1101                               "report this at https://jira.whamcloud.com/\n",
1102                               p1->flag, p2->flag);
1103                 }
1104                 return 0;
1105         }
1106
1107         return (p1->off + p1->count == p2->off);
1108 }
1109
1110 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1111 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1112                                    size_t pg_count, struct brw_page **pga,
1113                                    int opc, obd_dif_csum_fn *fn,
1114                                    int sector_size,
1115                                    u32 *check_sum)
1116 {
1117         struct ahash_request *req;
1118         /* Used Adler as the default checksum type on top of DIF tags */
1119         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1120         struct page *__page;
1121         unsigned char *buffer;
1122         __u16 *guard_start;
1123         unsigned int bufsize;
1124         int guard_number;
1125         int used_number = 0;
1126         int used;
1127         u32 cksum;
1128         int rc = 0;
1129         int i = 0;
1130
1131         LASSERT(pg_count > 0);
1132
1133         __page = alloc_page(GFP_KERNEL);
1134         if (__page == NULL)
1135                 return -ENOMEM;
1136
1137         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1138         if (IS_ERR(req)) {
1139                 rc = PTR_ERR(req);
1140                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1141                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1142                 GOTO(out, rc);
1143         }
1144
1145         buffer = kmap(__page);
1146         guard_start = (__u16 *)buffer;
1147         guard_number = PAGE_SIZE / sizeof(*guard_start);
1148         while (nob > 0 && pg_count > 0) {
1149                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1150
1151                 /* corrupt the data before we compute the checksum, to
1152                  * simulate an OST->client data error */
1153                 if (unlikely(i == 0 && opc == OST_READ &&
1154                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1155                         unsigned char *ptr = kmap(pga[i]->pg);
1156                         int off = pga[i]->off & ~PAGE_MASK;
1157
1158                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1159                         kunmap(pga[i]->pg);
1160                 }
1161
1162                 /*
1163                  * The left guard number should be able to hold checksums of a
1164                  * whole page
1165                  */
1166                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1167                                                   pga[i]->off & ~PAGE_MASK,
1168                                                   count,
1169                                                   guard_start + used_number,
1170                                                   guard_number - used_number,
1171                                                   &used, sector_size,
1172                                                   fn);
1173                 if (rc)
1174                         break;
1175
1176                 used_number += used;
1177                 if (used_number == guard_number) {
1178                         cfs_crypto_hash_update_page(req, __page, 0,
1179                                 used_number * sizeof(*guard_start));
1180                         used_number = 0;
1181                 }
1182
1183                 nob -= pga[i]->count;
1184                 pg_count--;
1185                 i++;
1186         }
1187         kunmap(__page);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         if (used_number != 0)
1192                 cfs_crypto_hash_update_page(req, __page, 0,
1193                         used_number * sizeof(*guard_start));
1194
1195         bufsize = sizeof(cksum);
1196         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1197
1198         /* For sending we only compute the wrong checksum instead
1199          * of corrupting the data so it is still correct on a redo */
1200         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1201                 cksum++;
1202
1203         *check_sum = cksum;
1204 out:
1205         __free_page(__page);
1206         return rc;
1207 }
1208 #else /* !CONFIG_CRC_T10DIF */
1209 #define obd_dif_ip_fn NULL
1210 #define obd_dif_crc_fn NULL
1211 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1212         -EOPNOTSUPP
1213 #endif /* CONFIG_CRC_T10DIF */
1214
1215 static int osc_checksum_bulk(int nob, size_t pg_count,
1216                              struct brw_page **pga, int opc,
1217                              enum cksum_types cksum_type,
1218                              u32 *cksum)
1219 {
1220         int                             i = 0;
1221         struct ahash_request           *req;
1222         unsigned int                    bufsize;
1223         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1224
1225         LASSERT(pg_count > 0);
1226
1227         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1228         if (IS_ERR(req)) {
1229                 CERROR("Unable to initialize checksum hash %s\n",
1230                        cfs_crypto_hash_name(cfs_alg));
1231                 return PTR_ERR(req);
1232         }
1233
1234         while (nob > 0 && pg_count > 0) {
1235                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1236
1237                 /* corrupt the data before we compute the checksum, to
1238                  * simulate an OST->client data error */
1239                 if (i == 0 && opc == OST_READ &&
1240                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1241                         unsigned char *ptr = kmap(pga[i]->pg);
1242                         int off = pga[i]->off & ~PAGE_MASK;
1243
1244                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1245                         kunmap(pga[i]->pg);
1246                 }
1247                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1248                                             pga[i]->off & ~PAGE_MASK,
1249                                             count);
1250                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1251                                (int)(pga[i]->off & ~PAGE_MASK));
1252
1253                 nob -= pga[i]->count;
1254                 pg_count--;
1255                 i++;
1256         }
1257
1258         bufsize = sizeof(*cksum);
1259         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1260
1261         /* For sending we only compute the wrong checksum instead
1262          * of corrupting the data so it is still correct on a redo */
1263         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1264                 (*cksum)++;
1265
1266         return 0;
1267 }
1268
1269 static int osc_checksum_bulk_rw(const char *obd_name,
1270                                 enum cksum_types cksum_type,
1271                                 int nob, size_t pg_count,
1272                                 struct brw_page **pga, int opc,
1273                                 u32 *check_sum)
1274 {
1275         obd_dif_csum_fn *fn = NULL;
1276         int sector_size = 0;
1277         int rc;
1278
1279         ENTRY;
1280         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1281
1282         if (fn)
1283                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1284                                              opc, fn, sector_size, check_sum);
1285         else
1286                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1287                                        check_sum);
1288
1289         RETURN(rc);
1290 }
1291
1292 static int
1293 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1294                      u32 page_count, struct brw_page **pga,
1295                      struct ptlrpc_request **reqp, int resend)
1296 {
1297         struct ptlrpc_request   *req;
1298         struct ptlrpc_bulk_desc *desc;
1299         struct ost_body         *body;
1300         struct obd_ioobj        *ioobj;
1301         struct niobuf_remote    *niobuf;
1302         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1303         struct osc_brw_async_args *aa;
1304         struct req_capsule      *pill;
1305         struct brw_page *pg_prev;
1306         void *short_io_buf;
1307         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1308
1309         ENTRY;
1310         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1311                 RETURN(-ENOMEM); /* Recoverable */
1312         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1313                 RETURN(-EINVAL); /* Fatal */
1314
1315         if ((cmd & OBD_BRW_WRITE) != 0) {
1316                 opc = OST_WRITE;
1317                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1318                                                 osc_rq_pool,
1319                                                 &RQF_OST_BRW_WRITE);
1320         } else {
1321                 opc = OST_READ;
1322                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1323         }
1324         if (req == NULL)
1325                 RETURN(-ENOMEM);
1326
1327         for (niocount = i = 1; i < page_count; i++) {
1328                 if (!can_merge_pages(pga[i - 1], pga[i]))
1329                         niocount++;
1330         }
1331
1332         pill = &req->rq_pill;
1333         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1334                              sizeof(*ioobj));
1335         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1336                              niocount * sizeof(*niobuf));
1337
1338         for (i = 0; i < page_count; i++)
1339                 short_io_size += pga[i]->count;
1340
1341         /* Check if read/write is small enough to be a short io. */
1342         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1343             !imp_connect_shortio(cli->cl_import))
1344                 short_io_size = 0;
1345
1346         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1347                              opc == OST_READ ? 0 : short_io_size);
1348         if (opc == OST_READ)
1349                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1350                                      short_io_size);
1351
1352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1353         if (rc) {
1354                 ptlrpc_request_free(req);
1355                 RETURN(rc);
1356         }
1357         osc_set_io_portal(req);
1358
1359         ptlrpc_at_set_req_timeout(req);
1360         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1361          * retry logic */
1362         req->rq_no_retry_einprogress = 1;
1363
1364         if (short_io_size != 0) {
1365                 desc = NULL;
1366                 short_io_buf = NULL;
1367                 goto no_bulk;
1368         }
1369
1370         desc = ptlrpc_prep_bulk_imp(req, page_count,
1371                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1372                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1373                         PTLRPC_BULK_PUT_SINK) |
1374                         PTLRPC_BULK_BUF_KIOV,
1375                 OST_BULK_PORTAL,
1376                 &ptlrpc_bulk_kiov_pin_ops);
1377
1378         if (desc == NULL)
1379                 GOTO(out, rc = -ENOMEM);
1380         /* NB request now owns desc and will free it when it gets freed */
1381 no_bulk:
1382         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1383         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1384         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1385         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1386
1387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1388
1389         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1390          * and from_kgid(), because they are asynchronous. Fortunately, variable
1391          * oa contains valid o_uid and o_gid in these two operations.
1392          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1393          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1394          * other process logic */
1395         body->oa.o_uid = oa->o_uid;
1396         body->oa.o_gid = oa->o_gid;
1397
1398         obdo_to_ioobj(oa, ioobj);
1399         ioobj->ioo_bufcnt = niocount;
1400         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1401          * that might be send for this request.  The actual number is decided
1402          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1403          * "max - 1" for old client compatibility sending "0", and also so the
1404          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1405         if (desc != NULL)
1406                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1407         else /* short io */
1408                 ioobj_max_brw_set(ioobj, 0);
1409
1410         if (short_io_size != 0) {
1411                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1412                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1413                         body->oa.o_flags = 0;
1414                 }
1415                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1416                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1417                        short_io_size);
1418                 if (opc == OST_WRITE) {
1419                         short_io_buf = req_capsule_client_get(pill,
1420                                                               &RMF_SHORT_IO);
1421                         LASSERT(short_io_buf != NULL);
1422                 }
1423         }
1424
1425         LASSERT(page_count > 0);
1426         pg_prev = pga[0];
1427         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1428                 struct brw_page *pg = pga[i];
1429                 int poff = pg->off & ~PAGE_MASK;
1430
1431                 LASSERT(pg->count > 0);
1432                 /* make sure there is no gap in the middle of page array */
1433                 LASSERTF(page_count == 1 ||
1434                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1435                           ergo(i > 0 && i < page_count - 1,
1436                                poff == 0 && pg->count == PAGE_SIZE)   &&
1437                           ergo(i == page_count - 1, poff == 0)),
1438                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1439                          i, page_count, pg, pg->off, pg->count);
1440                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1441                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1442                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1443                          i, page_count,
1444                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1445                          pg_prev->pg, page_private(pg_prev->pg),
1446                          pg_prev->pg->index, pg_prev->off);
1447                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1448                         (pg->flag & OBD_BRW_SRVLOCK));
1449                 if (short_io_size != 0 && opc == OST_WRITE) {
1450                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1451
1452                         LASSERT(short_io_size >= requested_nob + pg->count);
1453                         memcpy(short_io_buf + requested_nob,
1454                                ptr + poff,
1455                                pg->count);
1456                         ll_kunmap_atomic(ptr, KM_USER0);
1457                 } else if (short_io_size == 0) {
1458                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1459                                                          pg->count);
1460                 }
1461                 requested_nob += pg->count;
1462
1463                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1464                         niobuf--;
1465                         niobuf->rnb_len += pg->count;
1466                 } else {
1467                         niobuf->rnb_offset = pg->off;
1468                         niobuf->rnb_len    = pg->count;
1469                         niobuf->rnb_flags  = pg->flag;
1470                 }
1471                 pg_prev = pg;
1472         }
1473
1474         LASSERTF((void *)(niobuf - niocount) ==
1475                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1476                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1477                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1478
1479         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1480         if (resend) {
1481                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1482                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1483                         body->oa.o_flags = 0;
1484                 }
1485                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1486         }
1487
1488         if (osc_should_shrink_grant(cli))
1489                 osc_shrink_grant_local(cli, &body->oa);
1490
1491         /* size[REQ_REC_OFF] still sizeof (*body) */
1492         if (opc == OST_WRITE) {
1493                 if (cli->cl_checksum &&
1494                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1495                         /* store cl_cksum_type in a local variable since
1496                          * it can be changed via lprocfs */
1497                         enum cksum_types cksum_type = cli->cl_cksum_type;
1498
1499                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1500                                 body->oa.o_flags = 0;
1501
1502                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1503                                                                 cksum_type);
1504                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1505
1506                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1507                                                   requested_nob, page_count,
1508                                                   pga, OST_WRITE,
1509                                                   &body->oa.o_cksum);
1510                         if (rc < 0) {
1511                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1512                                        rc);
1513                                 GOTO(out, rc);
1514                         }
1515                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1516                                body->oa.o_cksum);
1517
1518                         /* save this in 'oa', too, for later checking */
1519                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1521                                                            cksum_type);
1522                 } else {
1523                         /* clear out the checksum flag, in case this is a
1524                          * resend but cl_checksum is no longer set. b=11238 */
1525                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1526                 }
1527                 oa->o_cksum = body->oa.o_cksum;
1528                 /* 1 RC per niobuf */
1529                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1530                                      sizeof(__u32) * niocount);
1531         } else {
1532                 if (cli->cl_checksum &&
1533                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1534                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1535                                 body->oa.o_flags = 0;
1536                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1537                                 cli->cl_cksum_type);
1538                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1539                 }
1540
1541                 /* Client cksum has been already copied to wire obdo in previous
1542                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1543                  * resent due to cksum error, this will allow Server to
1544                  * check+dump pages on its side */
1545         }
1546         ptlrpc_request_set_replen(req);
1547
1548         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1549         aa = ptlrpc_req_async_args(req);
1550         aa->aa_oa = oa;
1551         aa->aa_requested_nob = requested_nob;
1552         aa->aa_nio_count = niocount;
1553         aa->aa_page_count = page_count;
1554         aa->aa_resends = 0;
1555         aa->aa_ppga = pga;
1556         aa->aa_cli = cli;
1557         INIT_LIST_HEAD(&aa->aa_oaps);
1558
1559         *reqp = req;
1560         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1561         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1562                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1563                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1564         RETURN(0);
1565
1566  out:
1567         ptlrpc_req_finished(req);
1568         RETURN(rc);
1569 }
1570
1571 char dbgcksum_file_name[PATH_MAX];
1572
1573 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1574                                 struct brw_page **pga, __u32 server_cksum,
1575                                 __u32 client_cksum)
1576 {
1577         struct file *filp;
1578         int rc, i;
1579         unsigned int len;
1580         char *buf;
1581
1582         /* will only keep dump of pages on first error for the same range in
1583          * file/fid, not during the resends/retries. */
1584         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1585                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1586                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1587                   libcfs_debug_file_path_arr :
1588                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1591                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1592                  pga[0]->off,
1593                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1594                  client_cksum, server_cksum);
1595         filp = filp_open(dbgcksum_file_name,
1596                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1597         if (IS_ERR(filp)) {
1598                 rc = PTR_ERR(filp);
1599                 if (rc == -EEXIST)
1600                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1601                                "checksum error: rc = %d\n", dbgcksum_file_name,
1602                                rc);
1603                 else
1604                         CERROR("%s: can't open to dump pages with checksum "
1605                                "error: rc = %d\n", dbgcksum_file_name, rc);
1606                 return;
1607         }
1608
1609         for (i = 0; i < page_count; i++) {
1610                 len = pga[i]->count;
1611                 buf = kmap(pga[i]->pg);
1612                 while (len != 0) {
1613                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1614                         if (rc < 0) {
1615                                 CERROR("%s: wanted to write %u but got %d "
1616                                        "error\n", dbgcksum_file_name, len, rc);
1617                                 break;
1618                         }
1619                         len -= rc;
1620                         buf += rc;
1621                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1622                                dbgcksum_file_name, rc);
1623                 }
1624                 kunmap(pga[i]->pg);
1625         }
1626
1627         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1628         if (rc)
1629                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1630         filp_close(filp, NULL);
1631         return;
1632 }
1633
1634 static int
1635 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1636                      __u32 client_cksum, __u32 server_cksum,
1637                      struct osc_brw_async_args *aa)
1638 {
1639         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1640         enum cksum_types cksum_type;
1641         obd_dif_csum_fn *fn = NULL;
1642         int sector_size = 0;
1643         __u32 new_cksum;
1644         char *msg;
1645         int rc;
1646
1647         if (server_cksum == client_cksum) {
1648                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1649                 return 0;
1650         }
1651
1652         if (aa->aa_cli->cl_checksum_dump)
1653                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1654                                     server_cksum, client_cksum);
1655
1656         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1657                                            oa->o_flags : 0);
1658
1659         switch (cksum_type) {
1660         case OBD_CKSUM_T10IP512:
1661                 fn = obd_dif_ip_fn;
1662                 sector_size = 512;
1663                 break;
1664         case OBD_CKSUM_T10IP4K:
1665                 fn = obd_dif_ip_fn;
1666                 sector_size = 4096;
1667                 break;
1668         case OBD_CKSUM_T10CRC512:
1669                 fn = obd_dif_crc_fn;
1670                 sector_size = 512;
1671                 break;
1672         case OBD_CKSUM_T10CRC4K:
1673                 fn = obd_dif_crc_fn;
1674                 sector_size = 4096;
1675                 break;
1676         default:
1677                 break;
1678         }
1679
1680         if (fn)
1681                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1682                                              aa->aa_page_count, aa->aa_ppga,
1683                                              OST_WRITE, fn, sector_size,
1684                                              &new_cksum);
1685         else
1686                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1687                                        aa->aa_ppga, OST_WRITE, cksum_type,
1688                                        &new_cksum);
1689
1690         if (rc < 0)
1691                 msg = "failed to calculate the client write checksum";
1692         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1693                 msg = "the server did not use the checksum type specified in "
1694                       "the original request - likely a protocol problem";
1695         else if (new_cksum == server_cksum)
1696                 msg = "changed on the client after we checksummed it - "
1697                       "likely false positive due to mmap IO (bug 11742)";
1698         else if (new_cksum == client_cksum)
1699                 msg = "changed in transit before arrival at OST";
1700         else
1701                 msg = "changed in transit AND doesn't match the original - "
1702                       "likely false positive due to mmap IO (bug 11742)";
1703
1704         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1705                            DFID " object "DOSTID" extent [%llu-%llu], original "
1706                            "client csum %x (type %x), server csum %x (type %x),"
1707                            " client csum now %x\n",
1708                            obd_name, msg, libcfs_nid2str(peer->nid),
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1711                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1712                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1713                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1714                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1715                            client_cksum,
1716                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1717                            server_cksum, cksum_type, new_cksum);
1718         return 1;
1719 }
1720
1721 /* Note rc enters this function as number of bytes transferred */
1722 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1723 {
1724         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1725         struct client_obd *cli = aa->aa_cli;
1726         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1727         const struct lnet_process_id *peer =
1728                 &req->rq_import->imp_connection->c_peer;
1729         struct ost_body *body;
1730         u32 client_cksum = 0;
1731         ENTRY;
1732
1733         if (rc < 0 && rc != -EDQUOT) {
1734                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1735                 RETURN(rc);
1736         }
1737
1738         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1739         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1740         if (body == NULL) {
1741                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1742                 RETURN(-EPROTO);
1743         }
1744
1745         /* set/clear over quota flag for a uid/gid/projid */
1746         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1747             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1748                 unsigned qid[LL_MAXQUOTAS] = {
1749                                          body->oa.o_uid, body->oa.o_gid,
1750                                          body->oa.o_projid };
1751                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1752                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1753                        body->oa.o_valid, body->oa.o_flags);
1754                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1755                                        body->oa.o_flags);
1756         }
1757
1758         osc_update_grant(cli, body);
1759
1760         if (rc < 0)
1761                 RETURN(rc);
1762
1763         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1764                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1765
1766         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1767                 if (rc > 0) {
1768                         CERROR("Unexpected +ve rc %d\n", rc);
1769                         RETURN(-EPROTO);
1770                 }
1771
1772                 if (req->rq_bulk != NULL &&
1773                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1774                         RETURN(-EAGAIN);
1775
1776                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1777                     check_write_checksum(&body->oa, peer, client_cksum,
1778                                          body->oa.o_cksum, aa))
1779                         RETURN(-EAGAIN);
1780
1781                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1782                                      aa->aa_page_count, aa->aa_ppga);
1783                 GOTO(out, rc);
1784         }
1785
1786         /* The rest of this function executes only for OST_READs */
1787
1788         if (req->rq_bulk == NULL) {
1789                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1790                                           RCL_SERVER);
1791                 LASSERT(rc == req->rq_status);
1792         } else {
1793                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1794                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1795         }
1796         if (rc < 0)
1797                 GOTO(out, rc = -EAGAIN);
1798
1799         if (rc > aa->aa_requested_nob) {
1800                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1801                        aa->aa_requested_nob);
1802                 RETURN(-EPROTO);
1803         }
1804
1805         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1806                 CERROR ("Unexpected rc %d (%d transferred)\n",
1807                         rc, req->rq_bulk->bd_nob_transferred);
1808                 return (-EPROTO);
1809         }
1810
1811         if (req->rq_bulk == NULL) {
1812                 /* short io */
1813                 int nob, pg_count, i = 0;
1814                 unsigned char *buf;
1815
1816                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1817                 pg_count = aa->aa_page_count;
1818                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1819                                                    rc);
1820                 nob = rc;
1821                 while (nob > 0 && pg_count > 0) {
1822                         unsigned char *ptr;
1823                         int count = aa->aa_ppga[i]->count > nob ?
1824                                     nob : aa->aa_ppga[i]->count;
1825
1826                         CDEBUG(D_CACHE, "page %p count %d\n",
1827                                aa->aa_ppga[i]->pg, count);
1828                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1829                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1830                                count);
1831                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1832
1833                         buf += count;
1834                         nob -= count;
1835                         i++;
1836                         pg_count--;
1837                 }
1838         }
1839
1840         if (rc < aa->aa_requested_nob)
1841                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1842
1843         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1844                 static int cksum_counter;
1845                 u32        server_cksum = body->oa.o_cksum;
1846                 char      *via = "";
1847                 char      *router = "";
1848                 enum cksum_types cksum_type;
1849                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1850                         body->oa.o_flags : 0;
1851
1852                 cksum_type = obd_cksum_type_unpack(o_flags);
1853                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1854                                           aa->aa_page_count, aa->aa_ppga,
1855                                           OST_READ, &client_cksum);
1856                 if (rc < 0)
1857                         GOTO(out, rc);
1858
1859                 if (req->rq_bulk != NULL &&
1860                     peer->nid != req->rq_bulk->bd_sender) {
1861                         via = " via ";
1862                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1863                 }
1864
1865                 if (server_cksum != client_cksum) {
1866                         struct ost_body *clbody;
1867                         u32 page_count = aa->aa_page_count;
1868
1869                         clbody = req_capsule_client_get(&req->rq_pill,
1870                                                         &RMF_OST_BODY);
1871                         if (cli->cl_checksum_dump)
1872                                 dump_all_bulk_pages(&clbody->oa, page_count,
1873                                                     aa->aa_ppga, server_cksum,
1874                                                     client_cksum);
1875
1876                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1877                                            "%s%s%s inode "DFID" object "DOSTID
1878                                            " extent [%llu-%llu], client %x, "
1879                                            "server %x, cksum_type %x\n",
1880                                            obd_name,
1881                                            libcfs_nid2str(peer->nid),
1882                                            via, router,
1883                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1884                                                 clbody->oa.o_parent_seq : 0ULL,
1885                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1886                                                 clbody->oa.o_parent_oid : 0,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_ver : 0,
1889                                            POSTID(&body->oa.o_oi),
1890                                            aa->aa_ppga[0]->off,
1891                                            aa->aa_ppga[page_count-1]->off +
1892                                            aa->aa_ppga[page_count-1]->count - 1,
1893                                            client_cksum, server_cksum,
1894                                            cksum_type);
1895                         cksum_counter = 0;
1896                         aa->aa_oa->o_cksum = client_cksum;
1897                         rc = -EAGAIN;
1898                 } else {
1899                         cksum_counter++;
1900                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1901                         rc = 0;
1902                 }
1903         } else if (unlikely(client_cksum)) {
1904                 static int cksum_missed;
1905
1906                 cksum_missed++;
1907                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1908                         CERROR("Checksum %u requested from %s but not sent\n",
1909                                cksum_missed, libcfs_nid2str(peer->nid));
1910         } else {
1911                 rc = 0;
1912         }
1913 out:
1914         if (rc >= 0)
1915                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1916                                      aa->aa_oa, &body->oa);
1917
1918         RETURN(rc);
1919 }
1920
1921 static int osc_brw_redo_request(struct ptlrpc_request *request,
1922                                 struct osc_brw_async_args *aa, int rc)
1923 {
1924         struct ptlrpc_request *new_req;
1925         struct osc_brw_async_args *new_aa;
1926         struct osc_async_page *oap;
1927         ENTRY;
1928
1929         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1930                   "redo for recoverable error %d", rc);
1931
1932         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1933                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1934                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1935                                   aa->aa_ppga, &new_req, 1);
1936         if (rc)
1937                 RETURN(rc);
1938
1939         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1940                 if (oap->oap_request != NULL) {
1941                         LASSERTF(request == oap->oap_request,
1942                                  "request %p != oap_request %p\n",
1943                                  request, oap->oap_request);
1944                         if (oap->oap_interrupted) {
1945                                 ptlrpc_req_finished(new_req);
1946                                 RETURN(-EINTR);
1947                         }
1948                 }
1949         }
1950         /*
1951          * New request takes over pga and oaps from old request.
1952          * Note that copying a list_head doesn't work, need to move it...
1953          */
1954         aa->aa_resends++;
1955         new_req->rq_interpret_reply = request->rq_interpret_reply;
1956         new_req->rq_async_args = request->rq_async_args;
1957         new_req->rq_commit_cb = request->rq_commit_cb;
1958         /* cap resend delay to the current request timeout, this is similar to
1959          * what ptlrpc does (see after_reply()) */
1960         if (aa->aa_resends > new_req->rq_timeout)
1961                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1962         else
1963                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1964         new_req->rq_generation_set = 1;
1965         new_req->rq_import_generation = request->rq_import_generation;
1966
1967         new_aa = ptlrpc_req_async_args(new_req);
1968
1969         INIT_LIST_HEAD(&new_aa->aa_oaps);
1970         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1971         INIT_LIST_HEAD(&new_aa->aa_exts);
1972         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1973         new_aa->aa_resends = aa->aa_resends;
1974
1975         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1976                 if (oap->oap_request) {
1977                         ptlrpc_req_finished(oap->oap_request);
1978                         oap->oap_request = ptlrpc_request_addref(new_req);
1979                 }
1980         }
1981
1982         /* XXX: This code will run into problem if we're going to support
1983          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1984          * and wait for all of them to be finished. We should inherit request
1985          * set from old request. */
1986         ptlrpcd_add_req(new_req);
1987
1988         DEBUG_REQ(D_INFO, new_req, "new request");
1989         RETURN(0);
1990 }
1991
1992 /*
1993  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1994  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1995  * fine for our small page arrays and doesn't require allocation.  its an
1996  * insertion sort that swaps elements that are strides apart, shrinking the
1997  * stride down until its '1' and the array is sorted.
1998  */
1999 static void sort_brw_pages(struct brw_page **array, int num)
2000 {
2001         int stride, i, j;
2002         struct brw_page *tmp;
2003
2004         if (num == 1)
2005                 return;
2006         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2007                 ;
2008
2009         do {
2010                 stride /= 3;
2011                 for (i = stride ; i < num ; i++) {
2012                         tmp = array[i];
2013                         j = i;
2014                         while (j >= stride && array[j - stride]->off > tmp->off) {
2015                                 array[j] = array[j - stride];
2016                                 j -= stride;
2017                         }
2018                         array[j] = tmp;
2019                 }
2020         } while (stride > 1);
2021 }
2022
2023 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2024 {
2025         LASSERT(ppga != NULL);
2026         OBD_FREE(ppga, sizeof(*ppga) * count);
2027 }
2028
2029 static int brw_interpret(const struct lu_env *env,
2030                          struct ptlrpc_request *req, void *args, int rc)
2031 {
2032         struct osc_brw_async_args *aa = args;
2033         struct osc_extent *ext;
2034         struct osc_extent *tmp;
2035         struct client_obd *cli = aa->aa_cli;
2036         unsigned long transferred = 0;
2037
2038         ENTRY;
2039
2040         rc = osc_brw_fini_request(req, rc);
2041         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2042         /*
2043          * When server returns -EINPROGRESS, client should always retry
2044          * regardless of the number of times the bulk was resent already.
2045          */
2046         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2047                 if (req->rq_import_generation !=
2048                     req->rq_import->imp_generation) {
2049                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2050                                ""DOSTID", rc = %d.\n",
2051                                req->rq_import->imp_obd->obd_name,
2052                                POSTID(&aa->aa_oa->o_oi), rc);
2053                 } else if (rc == -EINPROGRESS ||
2054                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2055                         rc = osc_brw_redo_request(req, aa, rc);
2056                 } else {
2057                         CERROR("%s: too many resent retries for object: "
2058                                "%llu:%llu, rc = %d.\n",
2059                                req->rq_import->imp_obd->obd_name,
2060                                POSTID(&aa->aa_oa->o_oi), rc);
2061                 }
2062
2063                 if (rc == 0)
2064                         RETURN(0);
2065                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2066                         rc = -EIO;
2067         }
2068
2069         if (rc == 0) {
2070                 struct obdo *oa = aa->aa_oa;
2071                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2072                 unsigned long valid = 0;
2073                 struct cl_object *obj;
2074                 struct osc_async_page *last;
2075
2076                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2077                 obj = osc2cl(last->oap_obj);
2078
2079                 cl_object_attr_lock(obj);
2080                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2081                         attr->cat_blocks = oa->o_blocks;
2082                         valid |= CAT_BLOCKS;
2083                 }
2084                 if (oa->o_valid & OBD_MD_FLMTIME) {
2085                         attr->cat_mtime = oa->o_mtime;
2086                         valid |= CAT_MTIME;
2087                 }
2088                 if (oa->o_valid & OBD_MD_FLATIME) {
2089                         attr->cat_atime = oa->o_atime;
2090                         valid |= CAT_ATIME;
2091                 }
2092                 if (oa->o_valid & OBD_MD_FLCTIME) {
2093                         attr->cat_ctime = oa->o_ctime;
2094                         valid |= CAT_CTIME;
2095                 }
2096
2097                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2098                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2099                         loff_t last_off = last->oap_count + last->oap_obj_off +
2100                                 last->oap_page_off;
2101
2102                         /* Change file size if this is an out of quota or
2103                          * direct IO write and it extends the file size */
2104                         if (loi->loi_lvb.lvb_size < last_off) {
2105                                 attr->cat_size = last_off;
2106                                 valid |= CAT_SIZE;
2107                         }
2108                         /* Extend KMS if it's not a lockless write */
2109                         if (loi->loi_kms < last_off &&
2110                             oap2osc_page(last)->ops_srvlock == 0) {
2111                                 attr->cat_kms = last_off;
2112                                 valid |= CAT_KMS;
2113                         }
2114                 }
2115
2116                 if (valid != 0)
2117                         cl_object_attr_update(env, obj, attr, valid);
2118                 cl_object_attr_unlock(obj);
2119         }
2120         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2121
2122         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2123                 osc_inc_unstable_pages(req);
2124
2125         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2126                 list_del_init(&ext->oe_link);
2127                 osc_extent_finish(env, ext, 1,
2128                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2129         }
2130         LASSERT(list_empty(&aa->aa_exts));
2131         LASSERT(list_empty(&aa->aa_oaps));
2132
2133         transferred = (req->rq_bulk == NULL ? /* short io */
2134                        aa->aa_requested_nob :
2135                        req->rq_bulk->bd_nob_transferred);
2136
2137         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2138         ptlrpc_lprocfs_brw(req, transferred);
2139
2140         spin_lock(&cli->cl_loi_list_lock);
2141         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2142          * is called so we know whether to go to sync BRWs or wait for more
2143          * RPCs to complete */
2144         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2145                 cli->cl_w_in_flight--;
2146         else
2147                 cli->cl_r_in_flight--;
2148         osc_wake_cache_waiters(cli);
2149         spin_unlock(&cli->cl_loi_list_lock);
2150
2151         osc_io_unplug(env, cli, NULL);
2152         RETURN(rc);
2153 }
2154
2155 static void brw_commit(struct ptlrpc_request *req)
2156 {
2157         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2158          * this called via the rq_commit_cb, I need to ensure
2159          * osc_dec_unstable_pages is still called. Otherwise unstable
2160          * pages may be leaked. */
2161         spin_lock(&req->rq_lock);
2162         if (likely(req->rq_unstable)) {
2163                 req->rq_unstable = 0;
2164                 spin_unlock(&req->rq_lock);
2165
2166                 osc_dec_unstable_pages(req);
2167         } else {
2168                 req->rq_committed = 1;
2169                 spin_unlock(&req->rq_lock);
2170         }
2171 }
2172
2173 /**
2174  * Build an RPC by the list of extent @ext_list. The caller must ensure
2175  * that the total pages in this list are NOT over max pages per RPC.
2176  * Extents in the list must be in OES_RPC state.
2177  */
2178 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2179                   struct list_head *ext_list, int cmd)
2180 {
2181         struct ptlrpc_request           *req = NULL;
2182         struct osc_extent               *ext;
2183         struct brw_page                 **pga = NULL;
2184         struct osc_brw_async_args       *aa = NULL;
2185         struct obdo                     *oa = NULL;
2186         struct osc_async_page           *oap;
2187         struct osc_object               *obj = NULL;
2188         struct cl_req_attr              *crattr = NULL;
2189         loff_t                          starting_offset = OBD_OBJECT_EOF;
2190         loff_t                          ending_offset = 0;
2191         int                             mpflag = 0;
2192         int                             mem_tight = 0;
2193         int                             page_count = 0;
2194         bool                            soft_sync = false;
2195         bool                            interrupted = false;
2196         bool                            ndelay = false;
2197         int                             i;
2198         int                             grant = 0;
2199         int                             rc;
2200         __u32                           layout_version = 0;
2201         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2202         struct ost_body                 *body;
2203         ENTRY;
2204         LASSERT(!list_empty(ext_list));
2205
2206         /* add pages into rpc_list to build BRW rpc */
2207         list_for_each_entry(ext, ext_list, oe_link) {
2208                 LASSERT(ext->oe_state == OES_RPC);
2209                 mem_tight |= ext->oe_memalloc;
2210                 grant += ext->oe_grants;
2211                 page_count += ext->oe_nr_pages;
2212                 layout_version = MAX(layout_version, ext->oe_layout_version);
2213                 if (obj == NULL)
2214                         obj = ext->oe_obj;
2215         }
2216
2217         soft_sync = osc_over_unstable_soft_limit(cli);
2218         if (mem_tight)
2219                 mpflag = cfs_memory_pressure_get_and_set();
2220
2221         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2222         if (pga == NULL)
2223                 GOTO(out, rc = -ENOMEM);
2224
2225         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2226         if (oa == NULL)
2227                 GOTO(out, rc = -ENOMEM);
2228
2229         i = 0;
2230         list_for_each_entry(ext, ext_list, oe_link) {
2231                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2232                         if (mem_tight)
2233                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2234                         if (soft_sync)
2235                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2236                         pga[i] = &oap->oap_brw_page;
2237                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2238                         i++;
2239
2240                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2241                         if (starting_offset == OBD_OBJECT_EOF ||
2242                             starting_offset > oap->oap_obj_off)
2243                                 starting_offset = oap->oap_obj_off;
2244                         else
2245                                 LASSERT(oap->oap_page_off == 0);
2246                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2247                                 ending_offset = oap->oap_obj_off +
2248                                                 oap->oap_count;
2249                         else
2250                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2251                                         PAGE_SIZE);
2252                         if (oap->oap_interrupted)
2253                                 interrupted = true;
2254                 }
2255                 if (ext->oe_ndelay)
2256                         ndelay = true;
2257         }
2258
2259         /* first page in the list */
2260         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2261
2262         crattr = &osc_env_info(env)->oti_req_attr;
2263         memset(crattr, 0, sizeof(*crattr));
2264         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2265         crattr->cra_flags = ~0ULL;
2266         crattr->cra_page = oap2cl_page(oap);
2267         crattr->cra_oa = oa;
2268         cl_req_attr_set(env, osc2cl(obj), crattr);
2269
2270         if (cmd == OBD_BRW_WRITE) {
2271                 oa->o_grant_used = grant;
2272                 if (layout_version > 0) {
2273                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2274                                PFID(&oa->o_oi.oi_fid), layout_version);
2275
2276                         oa->o_layout_version = layout_version;
2277                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2278                 }
2279         }
2280
2281         sort_brw_pages(pga, page_count);
2282         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2283         if (rc != 0) {
2284                 CERROR("prep_req failed: %d\n", rc);
2285                 GOTO(out, rc);
2286         }
2287
2288         req->rq_commit_cb = brw_commit;
2289         req->rq_interpret_reply = brw_interpret;
2290         req->rq_memalloc = mem_tight != 0;
2291         oap->oap_request = ptlrpc_request_addref(req);
2292         if (interrupted && !req->rq_intr)
2293                 ptlrpc_mark_interrupted(req);
2294         if (ndelay) {
2295                 req->rq_no_resend = req->rq_no_delay = 1;
2296                 /* probably set a shorter timeout value.
2297                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2298                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2299         }
2300
2301         /* Need to update the timestamps after the request is built in case
2302          * we race with setattr (locally or in queue at OST).  If OST gets
2303          * later setattr before earlier BRW (as determined by the request xid),
2304          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2305          * way to do this in a single call.  bug 10150 */
2306         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2307         crattr->cra_oa = &body->oa;
2308         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2309         cl_req_attr_set(env, osc2cl(obj), crattr);
2310         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2311
2312         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2313         aa = ptlrpc_req_async_args(req);
2314         INIT_LIST_HEAD(&aa->aa_oaps);
2315         list_splice_init(&rpc_list, &aa->aa_oaps);
2316         INIT_LIST_HEAD(&aa->aa_exts);
2317         list_splice_init(ext_list, &aa->aa_exts);
2318
2319         spin_lock(&cli->cl_loi_list_lock);
2320         starting_offset >>= PAGE_SHIFT;
2321         if (cmd == OBD_BRW_READ) {
2322                 cli->cl_r_in_flight++;
2323                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2324                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2325                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2326                                       starting_offset + 1);
2327         } else {
2328                 cli->cl_w_in_flight++;
2329                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2330                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2331                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2332                                       starting_offset + 1);
2333         }
2334         spin_unlock(&cli->cl_loi_list_lock);
2335
2336         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2337                   page_count, aa, cli->cl_r_in_flight,
2338                   cli->cl_w_in_flight);
2339         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2340
2341         ptlrpcd_add_req(req);
2342         rc = 0;
2343         EXIT;
2344
2345 out:
2346         if (mem_tight != 0)
2347                 cfs_memory_pressure_restore(mpflag);
2348
2349         if (rc != 0) {
2350                 LASSERT(req == NULL);
2351
2352                 if (oa)
2353                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2354                 if (pga)
2355                         OBD_FREE(pga, sizeof(*pga) * page_count);
2356                 /* this should happen rarely and is pretty bad, it makes the
2357                  * pending list not follow the dirty order */
2358                 while (!list_empty(ext_list)) {
2359                         ext = list_entry(ext_list->next, struct osc_extent,
2360                                          oe_link);
2361                         list_del_init(&ext->oe_link);
2362                         osc_extent_finish(env, ext, 0, rc);
2363                 }
2364         }
2365         RETURN(rc);
2366 }
2367
2368 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2369 {
2370         int set = 0;
2371
2372         LASSERT(lock != NULL);
2373
2374         lock_res_and_lock(lock);
2375
2376         if (lock->l_ast_data == NULL)
2377                 lock->l_ast_data = data;
2378         if (lock->l_ast_data == data)
2379                 set = 1;
2380
2381         unlock_res_and_lock(lock);
2382
2383         return set;
2384 }
2385
2386 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2387                      void *cookie, struct lustre_handle *lockh,
2388                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2389                      int errcode)
2390 {
2391         bool intent = *flags & LDLM_FL_HAS_INTENT;
2392         int rc;
2393         ENTRY;
2394
2395         /* The request was created before ldlm_cli_enqueue call. */
2396         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2397                 struct ldlm_reply *rep;
2398
2399                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2400                 LASSERT(rep != NULL);
2401
2402                 rep->lock_policy_res1 =
2403                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2404                 if (rep->lock_policy_res1)
2405                         errcode = rep->lock_policy_res1;
2406                 if (!speculative)
2407                         *flags |= LDLM_FL_LVB_READY;
2408         } else if (errcode == ELDLM_OK) {
2409                 *flags |= LDLM_FL_LVB_READY;
2410         }
2411
2412         /* Call the update callback. */
2413         rc = (*upcall)(cookie, lockh, errcode);
2414
2415         /* release the reference taken in ldlm_cli_enqueue() */
2416         if (errcode == ELDLM_LOCK_MATCHED)
2417                 errcode = ELDLM_OK;
2418         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2419                 ldlm_lock_decref(lockh, mode);
2420
2421         RETURN(rc);
2422 }
2423
2424 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2425                           void *args, int rc)
2426 {
2427         struct osc_enqueue_args *aa = args;
2428         struct ldlm_lock *lock;
2429         struct lustre_handle *lockh = &aa->oa_lockh;
2430         enum ldlm_mode mode = aa->oa_mode;
2431         struct ost_lvb *lvb = aa->oa_lvb;
2432         __u32 lvb_len = sizeof(*lvb);
2433         __u64 flags = 0;
2434
2435         ENTRY;
2436
2437         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2438          * be valid. */
2439         lock = ldlm_handle2lock(lockh);
2440         LASSERTF(lock != NULL,
2441                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2442                  lockh->cookie, req, aa);
2443
2444         /* Take an additional reference so that a blocking AST that
2445          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2446          * to arrive after an upcall has been executed by
2447          * osc_enqueue_fini(). */
2448         ldlm_lock_addref(lockh, mode);
2449
2450         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2451         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2452
2453         /* Let CP AST to grant the lock first. */
2454         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2455
2456         if (aa->oa_speculative) {
2457                 LASSERT(aa->oa_lvb == NULL);
2458                 LASSERT(aa->oa_flags == NULL);
2459                 aa->oa_flags = &flags;
2460         }
2461
2462         /* Complete obtaining the lock procedure. */
2463         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2464                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2465                                    lockh, rc);
2466         /* Complete osc stuff. */
2467         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2468                               aa->oa_flags, aa->oa_speculative, rc);
2469
2470         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2471
2472         ldlm_lock_decref(lockh, mode);
2473         LDLM_LOCK_PUT(lock);
2474         RETURN(rc);
2475 }
2476
2477 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2478
2479 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2480  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2481  * other synchronous requests, however keeping some locks and trying to obtain
2482  * others may take a considerable amount of time in a case of ost failure; and
2483  * when other sync requests do not get released lock from a client, the client
2484  * is evicted from the cluster -- such scenarious make the life difficult, so
2485  * release locks just after they are obtained. */
2486 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2487                      __u64 *flags, union ldlm_policy_data *policy,
2488                      struct ost_lvb *lvb, int kms_valid,
2489                      osc_enqueue_upcall_f upcall, void *cookie,
2490                      struct ldlm_enqueue_info *einfo,
2491                      struct ptlrpc_request_set *rqset, int async,
2492                      bool speculative)
2493 {
2494         struct obd_device *obd = exp->exp_obd;
2495         struct lustre_handle lockh = { 0 };
2496         struct ptlrpc_request *req = NULL;
2497         int intent = *flags & LDLM_FL_HAS_INTENT;
2498         __u64 match_flags = *flags;
2499         enum ldlm_mode mode;
2500         int rc;
2501         ENTRY;
2502
2503         /* Filesystem lock extents are extended to page boundaries so that
2504          * dealing with the page cache is a little smoother.  */
2505         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2506         policy->l_extent.end |= ~PAGE_MASK;
2507
2508         /*
2509          * kms is not valid when either object is completely fresh (so that no
2510          * locks are cached), or object was evicted. In the latter case cached
2511          * lock cannot be used, because it would prime inode state with
2512          * potentially stale LVB.
2513          */
2514         if (!kms_valid)
2515                 goto no_match;
2516
2517         /* Next, search for already existing extent locks that will cover us */
2518         /* If we're trying to read, we also search for an existing PW lock.  The
2519          * VFS and page cache already protect us locally, so lots of readers/
2520          * writers can share a single PW lock.
2521          *
2522          * There are problems with conversion deadlocks, so instead of
2523          * converting a read lock to a write lock, we'll just enqueue a new
2524          * one.
2525          *
2526          * At some point we should cancel the read lock instead of making them
2527          * send us a blocking callback, but there are problems with canceling
2528          * locks out from other users right now, too. */
2529         mode = einfo->ei_mode;
2530         if (einfo->ei_mode == LCK_PR)
2531                 mode |= LCK_PW;
2532         /* Normal lock requests must wait for the LVB to be ready before
2533          * matching a lock; speculative lock requests do not need to,
2534          * because they will not actually use the lock. */
2535         if (!speculative)
2536                 match_flags |= LDLM_FL_LVB_READY;
2537         if (intent != 0)
2538                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2539         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2540                                einfo->ei_type, policy, mode, &lockh, 0);
2541         if (mode) {
2542                 struct ldlm_lock *matched;
2543
2544                 if (*flags & LDLM_FL_TEST_LOCK)
2545                         RETURN(ELDLM_OK);
2546
2547                 matched = ldlm_handle2lock(&lockh);
2548                 if (speculative) {
2549                         /* This DLM lock request is speculative, and does not
2550                          * have an associated IO request. Therefore if there
2551                          * is already a DLM lock, it wll just inform the
2552                          * caller to cancel the request for this stripe.*/
2553                         lock_res_and_lock(matched);
2554                         if (ldlm_extent_equal(&policy->l_extent,
2555                             &matched->l_policy_data.l_extent))
2556                                 rc = -EEXIST;
2557                         else
2558                                 rc = -ECANCELED;
2559                         unlock_res_and_lock(matched);
2560
2561                         ldlm_lock_decref(&lockh, mode);
2562                         LDLM_LOCK_PUT(matched);
2563                         RETURN(rc);
2564                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2565                         *flags |= LDLM_FL_LVB_READY;
2566
2567                         /* We already have a lock, and it's referenced. */
2568                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2569
2570                         ldlm_lock_decref(&lockh, mode);
2571                         LDLM_LOCK_PUT(matched);
2572                         RETURN(ELDLM_OK);
2573                 } else {
2574                         ldlm_lock_decref(&lockh, mode);
2575                         LDLM_LOCK_PUT(matched);
2576                 }
2577         }
2578
2579 no_match:
2580         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2581                 RETURN(-ENOLCK);
2582
2583         if (intent) {
2584                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2585                                            &RQF_LDLM_ENQUEUE_LVB);
2586                 if (req == NULL)
2587                         RETURN(-ENOMEM);
2588
2589                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2590                 if (rc) {
2591                         ptlrpc_request_free(req);
2592                         RETURN(rc);
2593                 }
2594
2595                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2596                                      sizeof *lvb);
2597                 ptlrpc_request_set_replen(req);
2598         }
2599
2600         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2601         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2602
2603         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2604                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2605         if (async) {
2606                 if (!rc) {
2607                         struct osc_enqueue_args *aa;
2608                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2609                         aa = ptlrpc_req_async_args(req);
2610                         aa->oa_exp         = exp;
2611                         aa->oa_mode        = einfo->ei_mode;
2612                         aa->oa_type        = einfo->ei_type;
2613                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2614                         aa->oa_upcall      = upcall;
2615                         aa->oa_cookie      = cookie;
2616                         aa->oa_speculative = speculative;
2617                         if (!speculative) {
2618                                 aa->oa_flags  = flags;
2619                                 aa->oa_lvb    = lvb;
2620                         } else {
2621                                 /* speculative locks are essentially to enqueue
2622                                  * a DLM lock  in advance, so we don't care
2623                                  * about the result of the enqueue. */
2624                                 aa->oa_lvb    = NULL;
2625                                 aa->oa_flags  = NULL;
2626                         }
2627
2628                         req->rq_interpret_reply = osc_enqueue_interpret;
2629                         if (rqset == PTLRPCD_SET)
2630                                 ptlrpcd_add_req(req);
2631                         else
2632                                 ptlrpc_set_add_req(rqset, req);
2633                 } else if (intent) {
2634                         ptlrpc_req_finished(req);
2635                 }
2636                 RETURN(rc);
2637         }
2638
2639         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2640                               flags, speculative, rc);
2641         if (intent)
2642                 ptlrpc_req_finished(req);
2643
2644         RETURN(rc);
2645 }
2646
2647 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2648                    enum ldlm_type type, union ldlm_policy_data *policy,
2649                    enum ldlm_mode mode, __u64 *flags, void *data,
2650                    struct lustre_handle *lockh, int unref)
2651 {
2652         struct obd_device *obd = exp->exp_obd;
2653         __u64 lflags = *flags;
2654         enum ldlm_mode rc;
2655         ENTRY;
2656
2657         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2658                 RETURN(-EIO);
2659
2660         /* Filesystem lock extents are extended to page boundaries so that
2661          * dealing with the page cache is a little smoother */
2662         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2663         policy->l_extent.end |= ~PAGE_MASK;
2664
2665         /* Next, search for already existing extent locks that will cover us */
2666         /* If we're trying to read, we also search for an existing PW lock.  The
2667          * VFS and page cache already protect us locally, so lots of readers/
2668          * writers can share a single PW lock. */
2669         rc = mode;
2670         if (mode == LCK_PR)
2671                 rc |= LCK_PW;
2672         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2673                              res_id, type, policy, rc, lockh, unref);
2674         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2675                 RETURN(rc);
2676
2677         if (data != NULL) {
2678                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2679
2680                 LASSERT(lock != NULL);
2681                 if (!osc_set_lock_data(lock, data)) {
2682                         ldlm_lock_decref(lockh, rc);
2683                         rc = 0;
2684                 }
2685                 LDLM_LOCK_PUT(lock);
2686         }
2687         RETURN(rc);
2688 }
2689
2690 static int osc_statfs_interpret(const struct lu_env *env,
2691                                 struct ptlrpc_request *req, void *args, int rc)
2692 {
2693         struct osc_async_args *aa = args;
2694         struct obd_statfs *msfs;
2695
2696         ENTRY;
2697         if (rc == -EBADR)
2698                 /*
2699                  * The request has in fact never been sent due to issues at
2700                  * a higher level (LOV).  Exit immediately since the caller
2701                  * is aware of the problem and takes care of the clean up.
2702                  */
2703                 RETURN(rc);
2704
2705         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2706             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2707                 GOTO(out, rc = 0);
2708
2709         if (rc != 0)
2710                 GOTO(out, rc);
2711
2712         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2713         if (msfs == NULL)
2714                 GOTO(out, rc = -EPROTO);
2715
2716         *aa->aa_oi->oi_osfs = *msfs;
2717 out:
2718         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719
2720         RETURN(rc);
2721 }
2722
2723 static int osc_statfs_async(struct obd_export *exp,
2724                             struct obd_info *oinfo, time64_t max_age,
2725                             struct ptlrpc_request_set *rqset)
2726 {
2727         struct obd_device     *obd = class_exp2obd(exp);
2728         struct ptlrpc_request *req;
2729         struct osc_async_args *aa;
2730         int rc;
2731         ENTRY;
2732
2733         /* We could possibly pass max_age in the request (as an absolute
2734          * timestamp or a "seconds.usec ago") so the target can avoid doing
2735          * extra calls into the filesystem if that isn't necessary (e.g.
2736          * during mount that would help a bit).  Having relative timestamps
2737          * is not so great if request processing is slow, while absolute
2738          * timestamps are not ideal because they need time synchronization. */
2739         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2740         if (req == NULL)
2741                 RETURN(-ENOMEM);
2742
2743         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2744         if (rc) {
2745                 ptlrpc_request_free(req);
2746                 RETURN(rc);
2747         }
2748         ptlrpc_request_set_replen(req);
2749         req->rq_request_portal = OST_CREATE_PORTAL;
2750         ptlrpc_at_set_req_timeout(req);
2751
2752         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2753                 /* procfs requests not want stat in wait for avoid deadlock */
2754                 req->rq_no_resend = 1;
2755                 req->rq_no_delay = 1;
2756         }
2757
2758         req->rq_interpret_reply = osc_statfs_interpret;
2759         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2760         aa = ptlrpc_req_async_args(req);
2761         aa->aa_oi = oinfo;
2762
2763         ptlrpc_set_add_req(rqset, req);
2764         RETURN(0);
2765 }
2766
2767 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2768                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2769 {
2770         struct obd_device     *obd = class_exp2obd(exp);
2771         struct obd_statfs     *msfs;
2772         struct ptlrpc_request *req;
2773         struct obd_import     *imp = NULL;
2774         int rc;
2775         ENTRY;
2776
2777
2778         /*Since the request might also come from lprocfs, so we need
2779          *sync this with client_disconnect_export Bug15684*/
2780         down_read(&obd->u.cli.cl_sem);
2781         if (obd->u.cli.cl_import)
2782                 imp = class_import_get(obd->u.cli.cl_import);
2783         up_read(&obd->u.cli.cl_sem);
2784         if (!imp)
2785                 RETURN(-ENODEV);
2786
2787         /* We could possibly pass max_age in the request (as an absolute
2788          * timestamp or a "seconds.usec ago") so the target can avoid doing
2789          * extra calls into the filesystem if that isn't necessary (e.g.
2790          * during mount that would help a bit).  Having relative timestamps
2791          * is not so great if request processing is slow, while absolute
2792          * timestamps are not ideal because they need time synchronization. */
2793         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2794
2795         class_import_put(imp);
2796
2797         if (req == NULL)
2798                 RETURN(-ENOMEM);
2799
2800         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2801         if (rc) {
2802                 ptlrpc_request_free(req);
2803                 RETURN(rc);
2804         }
2805         ptlrpc_request_set_replen(req);
2806         req->rq_request_portal = OST_CREATE_PORTAL;
2807         ptlrpc_at_set_req_timeout(req);
2808
2809         if (flags & OBD_STATFS_NODELAY) {
2810                 /* procfs requests not want stat in wait for avoid deadlock */
2811                 req->rq_no_resend = 1;
2812                 req->rq_no_delay = 1;
2813         }
2814
2815         rc = ptlrpc_queue_wait(req);
2816         if (rc)
2817                 GOTO(out, rc);
2818
2819         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2820         if (msfs == NULL)
2821                 GOTO(out, rc = -EPROTO);
2822
2823         *osfs = *msfs;
2824
2825         EXIT;
2826 out:
2827         ptlrpc_req_finished(req);
2828         return rc;
2829 }
2830
2831 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2832                          void *karg, void __user *uarg)
2833 {
2834         struct obd_device *obd = exp->exp_obd;
2835         struct obd_ioctl_data *data = karg;
2836         int rc = 0;
2837
2838         ENTRY;
2839         if (!try_module_get(THIS_MODULE)) {
2840                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2841                        module_name(THIS_MODULE));
2842                 return -EINVAL;
2843         }
2844         switch (cmd) {
2845         case OBD_IOC_CLIENT_RECOVER:
2846                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2847                                            data->ioc_inlbuf1, 0);
2848                 if (rc > 0)
2849                         rc = 0;
2850                 break;
2851         case IOC_OSC_SET_ACTIVE:
2852                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2853                                               data->ioc_offset);
2854                 break;
2855         default:
2856                 rc = -ENOTTY;
2857                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2858                        obd->obd_name, cmd, current_comm(), rc);
2859                 break;
2860         }
2861
2862         module_put(THIS_MODULE);
2863         return rc;
2864 }
2865
2866 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2867                        u32 keylen, void *key, u32 vallen, void *val,
2868                        struct ptlrpc_request_set *set)
2869 {
2870         struct ptlrpc_request *req;
2871         struct obd_device     *obd = exp->exp_obd;
2872         struct obd_import     *imp = class_exp2cliimp(exp);
2873         char                  *tmp;
2874         int                    rc;
2875         ENTRY;
2876
2877         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2878
2879         if (KEY_IS(KEY_CHECKSUM)) {
2880                 if (vallen != sizeof(int))
2881                         RETURN(-EINVAL);
2882                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2883                 RETURN(0);
2884         }
2885
2886         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2887                 sptlrpc_conf_client_adapt(obd);
2888                 RETURN(0);
2889         }
2890
2891         if (KEY_IS(KEY_FLUSH_CTX)) {
2892                 sptlrpc_import_flush_my_ctx(imp);
2893                 RETURN(0);
2894         }
2895
2896         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2897                 struct client_obd *cli = &obd->u.cli;
2898                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2899                 long target = *(long *)val;
2900
2901                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2902                 *(long *)val -= nr;
2903                 RETURN(0);
2904         }
2905
2906         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2907                 RETURN(-EINVAL);
2908
2909         /* We pass all other commands directly to OST. Since nobody calls osc
2910            methods directly and everybody is supposed to go through LOV, we
2911            assume lov checked invalid values for us.
2912            The only recognised values so far are evict_by_nid and mds_conn.
2913            Even if something bad goes through, we'd get a -EINVAL from OST
2914            anyway. */
2915
2916         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2917                                                 &RQF_OST_SET_GRANT_INFO :
2918                                                 &RQF_OBD_SET_INFO);
2919         if (req == NULL)
2920                 RETURN(-ENOMEM);
2921
2922         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2923                              RCL_CLIENT, keylen);
2924         if (!KEY_IS(KEY_GRANT_SHRINK))
2925                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2926                                      RCL_CLIENT, vallen);
2927         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2928         if (rc) {
2929                 ptlrpc_request_free(req);
2930                 RETURN(rc);
2931         }
2932
2933         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2934         memcpy(tmp, key, keylen);
2935         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2936                                                         &RMF_OST_BODY :
2937                                                         &RMF_SETINFO_VAL);
2938         memcpy(tmp, val, vallen);
2939
2940         if (KEY_IS(KEY_GRANT_SHRINK)) {
2941                 struct osc_grant_args *aa;
2942                 struct obdo *oa;
2943
2944                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2945                 aa = ptlrpc_req_async_args(req);
2946                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2947                 if (!oa) {
2948                         ptlrpc_req_finished(req);
2949                         RETURN(-ENOMEM);
2950                 }
2951                 *oa = ((struct ost_body *)val)->oa;
2952                 aa->aa_oa = oa;
2953                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2954         }
2955
2956         ptlrpc_request_set_replen(req);
2957         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2958                 LASSERT(set != NULL);
2959                 ptlrpc_set_add_req(set, req);
2960                 ptlrpc_check_set(NULL, set);
2961         } else {
2962                 ptlrpcd_add_req(req);
2963         }
2964
2965         RETURN(0);
2966 }
2967 EXPORT_SYMBOL(osc_set_info_async);
2968
2969 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2970                   struct obd_device *obd, struct obd_uuid *cluuid,
2971                   struct obd_connect_data *data, void *localdata)
2972 {
2973         struct client_obd *cli = &obd->u.cli;
2974
2975         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2976                 long lost_grant;
2977                 long grant;
2978
2979                 spin_lock(&cli->cl_loi_list_lock);
2980                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2981                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2982                         /* restore ocd_grant_blkbits as client page bits */
2983                         data->ocd_grant_blkbits = PAGE_SHIFT;
2984                         grant += cli->cl_dirty_grant;
2985                 } else {
2986                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2987                 }
2988                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2989                 lost_grant = cli->cl_lost_grant;
2990                 cli->cl_lost_grant = 0;
2991                 spin_unlock(&cli->cl_loi_list_lock);
2992
2993                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2994                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2995                        data->ocd_version, data->ocd_grant, lost_grant);
2996         }
2997
2998         RETURN(0);
2999 }
3000 EXPORT_SYMBOL(osc_reconnect);
3001
3002 int osc_disconnect(struct obd_export *exp)
3003 {
3004         struct obd_device *obd = class_exp2obd(exp);
3005         int rc;
3006
3007         rc = client_disconnect_export(exp);
3008         /**
3009          * Initially we put del_shrink_grant before disconnect_export, but it
3010          * causes the following problem if setup (connect) and cleanup
3011          * (disconnect) are tangled together.
3012          *      connect p1                     disconnect p2
3013          *   ptlrpc_connect_import
3014          *     ...............               class_manual_cleanup
3015          *                                     osc_disconnect
3016          *                                     del_shrink_grant
3017          *   ptlrpc_connect_interrupt
3018          *     osc_init_grant
3019          *   add this client to shrink list
3020          *                                      cleanup_osc
3021          * Bang! grant shrink thread trigger the shrink. BUG18662
3022          */
3023         osc_del_grant_list(&obd->u.cli);
3024         return rc;
3025 }
3026 EXPORT_SYMBOL(osc_disconnect);
3027
3028 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3029                                  struct hlist_node *hnode, void *arg)
3030 {
3031         struct lu_env *env = arg;
3032         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3033         struct ldlm_lock *lock;
3034         struct osc_object *osc = NULL;
3035         ENTRY;
3036
3037         lock_res(res);
3038         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3039                 if (lock->l_ast_data != NULL && osc == NULL) {
3040                         osc = lock->l_ast_data;
3041                         cl_object_get(osc2cl(osc));
3042                 }
3043
3044                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3045                  * by the 2nd round of ldlm_namespace_clean() call in
3046                  * osc_import_event(). */
3047                 ldlm_clear_cleaned(lock);
3048         }
3049         unlock_res(res);
3050
3051         if (osc != NULL) {
3052                 osc_object_invalidate(env, osc);
3053                 cl_object_put(env, osc2cl(osc));
3054         }
3055
3056         RETURN(0);
3057 }
3058 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3059
3060 static int osc_import_event(struct obd_device *obd,
3061                             struct obd_import *imp,
3062                             enum obd_import_event event)
3063 {
3064         struct client_obd *cli;
3065         int rc = 0;
3066
3067         ENTRY;
3068         LASSERT(imp->imp_obd == obd);
3069
3070         switch (event) {
3071         case IMP_EVENT_DISCON: {
3072                 cli = &obd->u.cli;
3073                 spin_lock(&cli->cl_loi_list_lock);
3074                 cli->cl_avail_grant = 0;
3075                 cli->cl_lost_grant = 0;
3076                 spin_unlock(&cli->cl_loi_list_lock);
3077                 break;
3078         }
3079         case IMP_EVENT_INACTIVE: {
3080                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3081                 break;
3082         }
3083         case IMP_EVENT_INVALIDATE: {
3084                 struct ldlm_namespace *ns = obd->obd_namespace;
3085                 struct lu_env         *env;
3086                 __u16                  refcheck;
3087
3088                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3089
3090                 env = cl_env_get(&refcheck);
3091                 if (!IS_ERR(env)) {
3092                         osc_io_unplug(env, &obd->u.cli, NULL);
3093
3094                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3095                                                  osc_ldlm_resource_invalidate,
3096                                                  env, 0);
3097                         cl_env_put(env, &refcheck);
3098
3099                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3100                 } else
3101                         rc = PTR_ERR(env);
3102                 break;
3103         }
3104         case IMP_EVENT_ACTIVE: {
3105                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3106                 break;
3107         }
3108         case IMP_EVENT_OCD: {
3109                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3110
3111                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3112                         osc_init_grant(&obd->u.cli, ocd);
3113
3114                 /* See bug 7198 */
3115                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3116                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3117
3118                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3119                 break;
3120         }
3121         case IMP_EVENT_DEACTIVATE: {
3122                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3123                 break;
3124         }
3125         case IMP_EVENT_ACTIVATE: {
3126                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3127                 break;
3128         }
3129         default:
3130                 CERROR("Unknown import event %d\n", event);
3131                 LBUG();
3132         }
3133         RETURN(rc);
3134 }
3135
3136 /**
3137  * Determine whether the lock can be canceled before replaying the lock
3138  * during recovery, see bug16774 for detailed information.
3139  *
3140  * \retval zero the lock can't be canceled
3141  * \retval other ok to cancel
3142  */
3143 static int osc_cancel_weight(struct ldlm_lock *lock)
3144 {
3145         /*
3146          * Cancel all unused and granted extent lock.
3147          */
3148         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3149             ldlm_is_granted(lock) &&
3150             osc_ldlm_weigh_ast(lock) == 0)
3151                 RETURN(1);
3152
3153         RETURN(0);
3154 }
3155
3156 static int brw_queue_work(const struct lu_env *env, void *data)
3157 {
3158         struct client_obd *cli = data;
3159
3160         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3161
3162         osc_io_unplug(env, cli, NULL);
3163         RETURN(0);
3164 }
3165
3166 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3167 {
3168         struct client_obd *cli = &obd->u.cli;
3169         void *handler;
3170         int rc;
3171
3172         ENTRY;
3173
3174         rc = ptlrpcd_addref();
3175         if (rc)
3176                 RETURN(rc);
3177
3178         rc = client_obd_setup(obd, lcfg);
3179         if (rc)
3180                 GOTO(out_ptlrpcd, rc);
3181
3182
3183         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3184         if (IS_ERR(handler))
3185                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3186         cli->cl_writeback_work = handler;
3187
3188         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3189         if (IS_ERR(handler))
3190                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3191         cli->cl_lru_work = handler;
3192
3193         rc = osc_quota_setup(obd);
3194         if (rc)
3195                 GOTO(out_ptlrpcd_work, rc);
3196
3197         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3198         osc_update_next_shrink(cli);
3199
3200         RETURN(rc);
3201
3202 out_ptlrpcd_work:
3203         if (cli->cl_writeback_work != NULL) {
3204                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3205                 cli->cl_writeback_work = NULL;
3206         }
3207         if (cli->cl_lru_work != NULL) {
3208                 ptlrpcd_destroy_work(cli->cl_lru_work);
3209                 cli->cl_lru_work = NULL;
3210         }
3211         client_obd_cleanup(obd);
3212 out_ptlrpcd:
3213         ptlrpcd_decref();
3214         RETURN(rc);
3215 }
3216 EXPORT_SYMBOL(osc_setup_common);
3217
3218 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3219 {
3220         struct client_obd *cli = &obd->u.cli;
3221         int                adding;
3222         int                added;
3223         int                req_count;
3224         int                rc;
3225
3226         ENTRY;
3227
3228         rc = osc_setup_common(obd, lcfg);
3229         if (rc < 0)
3230                 RETURN(rc);
3231
3232         rc = osc_tunables_init(obd);
3233         if (rc)
3234                 RETURN(rc);
3235
3236         /*
3237          * We try to control the total number of requests with a upper limit
3238          * osc_reqpool_maxreqcount. There might be some race which will cause
3239          * over-limit allocation, but it is fine.
3240          */
3241         req_count = atomic_read(&osc_pool_req_count);
3242         if (req_count < osc_reqpool_maxreqcount) {
3243                 adding = cli->cl_max_rpcs_in_flight + 2;
3244                 if (req_count + adding > osc_reqpool_maxreqcount)
3245                         adding = osc_reqpool_maxreqcount - req_count;
3246
3247                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3248                 atomic_add(added, &osc_pool_req_count);
3249         }
3250
3251         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3252
3253         spin_lock(&osc_shrink_lock);
3254         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3255         spin_unlock(&osc_shrink_lock);
3256         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3257         cli->cl_import->imp_idle_debug = D_HA;
3258
3259         RETURN(0);
3260 }
3261
3262 int osc_precleanup_common(struct obd_device *obd)
3263 {
3264         struct client_obd *cli = &obd->u.cli;
3265         ENTRY;
3266
3267         /* LU-464
3268          * for echo client, export may be on zombie list, wait for
3269          * zombie thread to cull it, because cli.cl_import will be
3270          * cleared in client_disconnect_export():
3271          *   class_export_destroy() -> obd_cleanup() ->
3272          *   echo_device_free() -> echo_client_cleanup() ->
3273          *   obd_disconnect() -> osc_disconnect() ->
3274          *   client_disconnect_export()
3275          */
3276         obd_zombie_barrier();
3277         if (cli->cl_writeback_work) {
3278                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3279                 cli->cl_writeback_work = NULL;
3280         }
3281
3282         if (cli->cl_lru_work) {
3283                 ptlrpcd_destroy_work(cli->cl_lru_work);
3284                 cli->cl_lru_work = NULL;
3285         }
3286
3287         obd_cleanup_client_import(obd);
3288         RETURN(0);
3289 }
3290 EXPORT_SYMBOL(osc_precleanup_common);
3291
3292 static int osc_precleanup(struct obd_device *obd)
3293 {
3294         ENTRY;
3295
3296         osc_precleanup_common(obd);
3297
3298         ptlrpc_lprocfs_unregister_obd(obd);
3299         RETURN(0);
3300 }
3301
3302 int osc_cleanup_common(struct obd_device *obd)
3303 {
3304         struct client_obd *cli = &obd->u.cli;
3305         int rc;
3306
3307         ENTRY;
3308
3309         spin_lock(&osc_shrink_lock);
3310         list_del(&cli->cl_shrink_list);
3311         spin_unlock(&osc_shrink_lock);
3312
3313         /* lru cleanup */
3314         if (cli->cl_cache != NULL) {
3315                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3316                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3317                 list_del_init(&cli->cl_lru_osc);
3318                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3319                 cli->cl_lru_left = NULL;
3320                 cl_cache_decref(cli->cl_cache);
3321                 cli->cl_cache = NULL;
3322         }
3323
3324         /* free memory of osc quota cache */
3325         osc_quota_cleanup(obd);
3326
3327         rc = client_obd_cleanup(obd);
3328
3329         ptlrpcd_decref();
3330         RETURN(rc);
3331 }
3332 EXPORT_SYMBOL(osc_cleanup_common);
3333
3334 static struct obd_ops osc_obd_ops = {
3335         .o_owner                = THIS_MODULE,
3336         .o_setup                = osc_setup,
3337         .o_precleanup           = osc_precleanup,
3338         .o_cleanup              = osc_cleanup_common,
3339         .o_add_conn             = client_import_add_conn,
3340         .o_del_conn             = client_import_del_conn,
3341         .o_connect              = client_connect_import,
3342         .o_reconnect            = osc_reconnect,
3343         .o_disconnect           = osc_disconnect,
3344         .o_statfs               = osc_statfs,
3345         .o_statfs_async         = osc_statfs_async,
3346         .o_create               = osc_create,
3347         .o_destroy              = osc_destroy,
3348         .o_getattr              = osc_getattr,
3349         .o_setattr              = osc_setattr,
3350         .o_iocontrol            = osc_iocontrol,
3351         .o_set_info_async       = osc_set_info_async,
3352         .o_import_event         = osc_import_event,
3353         .o_quotactl             = osc_quotactl,
3354 };
3355
3356 static struct shrinker *osc_cache_shrinker;
3357 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3358 DEFINE_SPINLOCK(osc_shrink_lock);
3359
3360 #ifndef HAVE_SHRINKER_COUNT
3361 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3362 {
3363         struct shrink_control scv = {
3364                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3365                 .gfp_mask   = shrink_param(sc, gfp_mask)
3366         };
3367 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3368         struct shrinker *shrinker = NULL;
3369 #endif
3370
3371         (void)osc_cache_shrink_scan(shrinker, &scv);
3372
3373         return osc_cache_shrink_count(shrinker, &scv);
3374 }
3375 #endif
3376
3377 static int __init osc_init(void)
3378 {
3379         bool enable_proc = true;
3380         struct obd_type *type;
3381         unsigned int reqpool_size;
3382         unsigned int reqsize;
3383         int rc;
3384         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3385                          osc_cache_shrink_count, osc_cache_shrink_scan);
3386         ENTRY;
3387
3388         /* print an address of _any_ initialized kernel symbol from this
3389          * module, to allow debugging with gdb that doesn't support data
3390          * symbols from modules.*/
3391         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3392
3393         rc = lu_kmem_init(osc_caches);
3394         if (rc)
3395                 RETURN(rc);
3396
3397         type = class_search_type(LUSTRE_OSP_NAME);
3398         if (type != NULL && type->typ_procsym != NULL)
3399                 enable_proc = false;
3400
3401         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3402                                  LUSTRE_OSC_NAME, &osc_device_type);
3403         if (rc)
3404                 GOTO(out_kmem, rc);
3405
3406         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3407
3408         /* This is obviously too much memory, only prevent overflow here */
3409         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3410                 GOTO(out_type, rc = -EINVAL);
3411
3412         reqpool_size = osc_reqpool_mem_max << 20;
3413
3414         reqsize = 1;
3415         while (reqsize < OST_IO_MAXREQSIZE)
3416                 reqsize = reqsize << 1;
3417
3418         /*
3419          * We don't enlarge the request count in OSC pool according to
3420          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3421          * tried after normal allocation failed. So a small OSC pool won't
3422          * cause much performance degression in most of cases.
3423          */
3424         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3425
3426         atomic_set(&osc_pool_req_count, 0);
3427         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3428                                           ptlrpc_add_rqs_to_pool);
3429
3430         if (osc_rq_pool == NULL)
3431                 GOTO(out_type, rc = -ENOMEM);
3432
3433         rc = osc_start_grant_work();
3434         if (rc != 0)
3435                 GOTO(out_req_pool, rc);
3436
3437         RETURN(rc);
3438
3439 out_req_pool:
3440         ptlrpc_free_rq_pool(osc_rq_pool);
3441 out_type:
3442         class_unregister_type(LUSTRE_OSC_NAME);
3443 out_kmem:
3444         lu_kmem_fini(osc_caches);
3445
3446         RETURN(rc);
3447 }
3448
3449 static void __exit osc_exit(void)
3450 {
3451         osc_stop_grant_work();
3452         remove_shrinker(osc_cache_shrinker);
3453         class_unregister_type(LUSTRE_OSC_NAME);
3454         lu_kmem_fini(osc_caches);
3455         ptlrpc_free_rq_pool(osc_rq_pool);
3456 }
3457
3458 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3459 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3460 MODULE_VERSION(LUSTRE_VERSION_STRING);
3461 MODULE_LICENSE("GPL");
3462
3463 module_init(osc_init);
3464 module_exit(osc_exit);