Whamcloud - gitweb
a29d49f0f97b49aa899464904ad0aa6d5c748447
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         if (rqset == PTLRPCD_SET)
328                 ptlrpcd_add_req(req);
329         else
330                 ptlrpc_set_add_req(rqset, req);
331
332         RETURN(0);
333 }
334
335 static int osc_create(const struct lu_env *env, struct obd_export *exp,
336                       struct obdo *oa)
337 {
338         struct ptlrpc_request *req;
339         struct ost_body       *body;
340         int                    rc;
341         ENTRY;
342
343         LASSERT(oa != NULL);
344         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
345         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
346
347         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
348         if (req == NULL)
349                 GOTO(out, rc = -ENOMEM);
350
351         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
352         if (rc) {
353                 ptlrpc_request_free(req);
354                 GOTO(out, rc);
355         }
356
357         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358         LASSERT(body);
359
360         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
361
362         ptlrpc_request_set_replen(req);
363
364         rc = ptlrpc_queue_wait(req);
365         if (rc)
366                 GOTO(out_req, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 GOTO(out_req, rc = -EPROTO);
371
372         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
373         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
374
375         oa->o_blksize = cli_brw_size(exp->exp_obd);
376         oa->o_valid |= OBD_MD_FLBLKSZ;
377
378         CDEBUG(D_HA, "transno: %lld\n",
379                lustre_msg_get_transno(req->rq_repmsg));
380 out_req:
381         ptlrpc_req_finished(req);
382 out:
383         RETURN(rc);
384 }
385
386 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
387                    obd_enqueue_update_f upcall, void *cookie)
388 {
389         struct ptlrpc_request *req;
390         struct osc_setattr_args *sa;
391         struct obd_import *imp = class_exp2cliimp(exp);
392         struct ost_body *body;
393         int rc;
394
395         ENTRY;
396
397         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398         if (req == NULL)
399                 RETURN(-ENOMEM);
400
401         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
402         if (rc < 0) {
403                 ptlrpc_request_free(req);
404                 RETURN(rc);
405         }
406
407         osc_set_io_portal(req);
408
409         ptlrpc_at_set_req_timeout(req);
410
411         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
412
413         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
414
415         ptlrpc_request_set_replen(req);
416
417         req->rq_interpret_reply = osc_setattr_interpret;
418         sa = ptlrpc_req_async_args(sa, req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req, void *args, int rc)
431 {
432         struct osc_fsync_args *fa = args;
433         struct ost_body *body;
434         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
435         unsigned long valid = 0;
436         struct cl_object *obj;
437         ENTRY;
438
439         if (rc != 0)
440                 GOTO(out, rc);
441
442         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
443         if (body == NULL) {
444                 CERROR("can't unpack ost_body\n");
445                 GOTO(out, rc = -EPROTO);
446         }
447
448         *fa->fa_oa = body->oa;
449         obj = osc2cl(fa->fa_obj);
450
451         /* Update osc object's blocks attribute */
452         cl_object_attr_lock(obj);
453         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
454                 attr->cat_blocks = body->oa.o_blocks;
455                 valid |= CAT_BLOCKS;
456         }
457
458         if (valid != 0)
459                 cl_object_attr_update(env, obj, attr, valid);
460         cl_object_attr_unlock(obj);
461
462 out:
463         rc = fa->fa_upcall(fa->fa_cookie, rc);
464         RETURN(rc);
465 }
466
467 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
468                   obd_enqueue_update_f upcall, void *cookie,
469                   struct ptlrpc_request_set *rqset)
470 {
471         struct obd_export     *exp = osc_export(obj);
472         struct ptlrpc_request *req;
473         struct ost_body       *body;
474         struct osc_fsync_args *fa;
475         int                    rc;
476         ENTRY;
477
478         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
479         if (req == NULL)
480                 RETURN(-ENOMEM);
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 RETURN(rc);
486         }
487
488         /* overload the size and blocks fields in the oa with start/end */
489         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490         LASSERT(body);
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494         req->rq_interpret_reply = osc_sync_interpret;
495
496         fa = ptlrpc_req_async_args(fa, req);
497         fa->fa_obj = obj;
498         fa->fa_oa = oa;
499         fa->fa_upcall = upcall;
500         fa->fa_cookie = cookie;
501
502         if (rqset == PTLRPCD_SET)
503                 ptlrpcd_add_req(req);
504         else
505                 ptlrpc_set_add_req(rqset, req);
506
507         RETURN (0);
508 }
509
510 /* Find and cancel locally locks matched by @mode in the resource found by
511  * @objid. Found locks are added into @cancel list. Returns the amount of
512  * locks added to @cancels list. */
513 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
514                                    struct list_head *cancels,
515                                    enum ldlm_mode mode, __u64 lock_flags)
516 {
517         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
518         struct ldlm_res_id res_id;
519         struct ldlm_resource *res;
520         int count;
521         ENTRY;
522
523         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
524          * export) but disabled through procfs (flag in NS).
525          *
526          * This distinguishes from a case when ELC is not supported originally,
527          * when we still want to cancel locks in advance and just cancel them
528          * locally, without sending any RPC. */
529         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
530                 RETURN(0);
531
532         ostid_build_res_name(&oa->o_oi, &res_id);
533         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
534         if (IS_ERR(res))
535                 RETURN(0);
536
537         LDLM_RESOURCE_ADDREF(res);
538         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
539                                            lock_flags, 0, NULL);
540         LDLM_RESOURCE_DELREF(res);
541         ldlm_resource_putref(res);
542         RETURN(count);
543 }
544
545 static int osc_destroy_interpret(const struct lu_env *env,
546                                  struct ptlrpc_request *req, void *args, int rc)
547 {
548         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
549
550         atomic_dec(&cli->cl_destroy_in_flight);
551         wake_up(&cli->cl_destroy_waitq);
552
553         return 0;
554 }
555
556 static int osc_can_send_destroy(struct client_obd *cli)
557 {
558         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
559             cli->cl_max_rpcs_in_flight) {
560                 /* The destroy request can be sent */
561                 return 1;
562         }
563         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
564             cli->cl_max_rpcs_in_flight) {
565                 /*
566                  * The counter has been modified between the two atomic
567                  * operations.
568                  */
569                 wake_up(&cli->cl_destroy_waitq);
570         }
571         return 0;
572 }
573
574 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
575                        struct obdo *oa)
576 {
577         struct client_obd     *cli = &exp->exp_obd->u.cli;
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct list_head       cancels = LIST_HEAD_INIT(cancels);
581         int rc, count;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
590                                         LDLM_FL_DISCARD_DATA);
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
593         if (req == NULL) {
594                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
595                 RETURN(-ENOMEM);
596         }
597
598         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
599                                0, &cancels, count);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606         ptlrpc_at_set_req_timeout(req);
607
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
611
612         ptlrpc_request_set_replen(req);
613
614         req->rq_interpret_reply = osc_destroy_interpret;
615         if (!osc_can_send_destroy(cli)) {
616                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
617
618                 /*
619                  * Wait until the number of on-going destroy RPCs drops
620                  * under max_rpc_in_flight
621                  */
622                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
623                                             osc_can_send_destroy(cli), &lwi);
624                 if (rc) {
625                         ptlrpc_req_finished(req);
626                         RETURN(rc);
627                 }
628         }
629
630         /* Do not wait for response */
631         ptlrpcd_add_req(req);
632         RETURN(0);
633 }
634
635 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
636                                 long writing_bytes)
637 {
638         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
639
640         LASSERT(!(oa->o_valid & bits));
641
642         oa->o_valid |= bits;
643         spin_lock(&cli->cl_loi_list_lock);
644         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
645                 oa->o_dirty = cli->cl_dirty_grant;
646         else
647                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
648         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
649                      cli->cl_dirty_max_pages)) {
650                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
651                        cli->cl_dirty_pages, cli->cl_dirty_transit,
652                        cli->cl_dirty_max_pages);
653                 oa->o_undirty = 0;
654         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
655                             atomic_long_read(&obd_dirty_transit_pages) >
656                             (long)(obd_max_dirty_pages + 1))) {
657                 /* The atomic_read() allowing the atomic_inc() are
658                  * not covered by a lock thus they may safely race and trip
659                  * this CERROR() unless we add in a small fudge factor (+1). */
660                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
661                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
662                        atomic_long_read(&obd_dirty_transit_pages),
663                        obd_max_dirty_pages);
664                 oa->o_undirty = 0;
665         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
666                             0x7fffffff)) {
667                 CERROR("dirty %lu - dirty_max %lu too big???\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else {
671                 unsigned long nrpages;
672                 unsigned long undirty;
673
674                 nrpages = cli->cl_max_pages_per_rpc;
675                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
676                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
677                 undirty = nrpages << PAGE_SHIFT;
678                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
679                                  GRANT_PARAM)) {
680                         int nrextents;
681
682                         /* take extent tax into account when asking for more
683                          * grant space */
684                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
685                                      cli->cl_max_extent_pages;
686                         undirty += nrextents * cli->cl_grant_extent_tax;
687                 }
688                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
689                  * to add extent tax, etc.
690                  */
691                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
692                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
693         }
694         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
695         oa->o_dropped = cli->cl_lost_grant;
696         cli->cl_lost_grant = 0;
697         spin_unlock(&cli->cl_loi_list_lock);
698         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
699                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
700 }
701
702 void osc_update_next_shrink(struct client_obd *cli)
703 {
704         cli->cl_next_shrink_grant = ktime_get_seconds() +
705                                     cli->cl_grant_shrink_interval;
706
707         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
708                cli->cl_next_shrink_grant);
709 }
710
711 static void __osc_update_grant(struct client_obd *cli, u64 grant)
712 {
713         spin_lock(&cli->cl_loi_list_lock);
714         cli->cl_avail_grant += grant;
715         spin_unlock(&cli->cl_loi_list_lock);
716 }
717
718 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
719 {
720         if (body->oa.o_valid & OBD_MD_FLGRANT) {
721                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
722                 __osc_update_grant(cli, body->oa.o_grant);
723         }
724 }
725
726 /**
727  * grant thread data for shrinking space.
728  */
729 struct grant_thread_data {
730         struct list_head        gtd_clients;
731         struct mutex            gtd_mutex;
732         unsigned long           gtd_stopped:1;
733 };
734 static struct grant_thread_data client_gtd;
735
736 static int osc_shrink_grant_interpret(const struct lu_env *env,
737                                       struct ptlrpc_request *req,
738                                       void *args, int rc)
739 {
740         struct osc_grant_args *aa = args;
741         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
742         struct ost_body *body;
743
744         if (rc != 0) {
745                 __osc_update_grant(cli, aa->aa_oa->o_grant);
746                 GOTO(out, rc);
747         }
748
749         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
750         LASSERT(body);
751         osc_update_grant(cli, body);
752 out:
753         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
754         aa->aa_oa = NULL;
755
756         return rc;
757 }
758
759 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
760 {
761         spin_lock(&cli->cl_loi_list_lock);
762         oa->o_grant = cli->cl_avail_grant / 4;
763         cli->cl_avail_grant -= oa->o_grant;
764         spin_unlock(&cli->cl_loi_list_lock);
765         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
766                 oa->o_valid |= OBD_MD_FLFLAGS;
767                 oa->o_flags = 0;
768         }
769         oa->o_flags |= OBD_FL_SHRINK_GRANT;
770         osc_update_next_shrink(cli);
771 }
772
773 /* Shrink the current grant, either from some large amount to enough for a
774  * full set of in-flight RPCs, or if we have already shrunk to that limit
775  * then to enough for a single RPC.  This avoids keeping more grant than
776  * needed, and avoids shrinking the grant piecemeal. */
777 static int osc_shrink_grant(struct client_obd *cli)
778 {
779         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
780                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
781
782         spin_lock(&cli->cl_loi_list_lock);
783         if (cli->cl_avail_grant <= target_bytes)
784                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
785         spin_unlock(&cli->cl_loi_list_lock);
786
787         return osc_shrink_grant_to_target(cli, target_bytes);
788 }
789
790 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
791 {
792         int                     rc = 0;
793         struct ost_body        *body;
794         ENTRY;
795
796         spin_lock(&cli->cl_loi_list_lock);
797         /* Don't shrink if we are already above or below the desired limit
798          * We don't want to shrink below a single RPC, as that will negatively
799          * impact block allocation and long-term performance. */
800         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
801                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
802
803         if (target_bytes >= cli->cl_avail_grant) {
804                 spin_unlock(&cli->cl_loi_list_lock);
805                 RETURN(0);
806         }
807         spin_unlock(&cli->cl_loi_list_lock);
808
809         OBD_ALLOC_PTR(body);
810         if (!body)
811                 RETURN(-ENOMEM);
812
813         osc_announce_cached(cli, &body->oa, 0);
814
815         spin_lock(&cli->cl_loi_list_lock);
816         if (target_bytes >= cli->cl_avail_grant) {
817                 /* available grant has changed since target calculation */
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 GOTO(out_free, rc = 0);
820         }
821         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
822         cli->cl_avail_grant = target_bytes;
823         spin_unlock(&cli->cl_loi_list_lock);
824         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
825                 body->oa.o_valid |= OBD_MD_FLFLAGS;
826                 body->oa.o_flags = 0;
827         }
828         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
829         osc_update_next_shrink(cli);
830
831         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
832                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
833                                 sizeof(*body), body, NULL);
834         if (rc != 0)
835                 __osc_update_grant(cli, body->oa.o_grant);
836 out_free:
837         OBD_FREE_PTR(body);
838         RETURN(rc);
839 }
840
841 static int osc_should_shrink_grant(struct client_obd *client)
842 {
843         time64_t next_shrink = client->cl_next_shrink_grant;
844
845         if (client->cl_import == NULL)
846                 return 0;
847
848         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
849              OBD_CONNECT_GRANT_SHRINK) == 0)
850                 return 0;
851
852         if (ktime_get_seconds() >= next_shrink - 5) {
853                 /* Get the current RPC size directly, instead of going via:
854                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
855                  * Keep comment here so that it can be found by searching. */
856                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
859                     client->cl_avail_grant > brw_size)
860                         return 1;
861                 else
862                         osc_update_next_shrink(client);
863         }
864         return 0;
865 }
866
867 #define GRANT_SHRINK_RPC_BATCH  100
868
869 static struct delayed_work work;
870
871 static void osc_grant_work_handler(struct work_struct *data)
872 {
873         struct client_obd *cli;
874         int rpc_sent;
875         bool init_next_shrink = true;
876         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
877
878         rpc_sent = 0;
879         mutex_lock(&client_gtd.gtd_mutex);
880         list_for_each_entry(cli, &client_gtd.gtd_clients,
881                             cl_grant_chain) {
882                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
883                     osc_should_shrink_grant(cli)) {
884                         osc_shrink_grant(cli);
885                         rpc_sent++;
886                 }
887
888                 if (!init_next_shrink) {
889                         if (cli->cl_next_shrink_grant < next_shrink &&
890                             cli->cl_next_shrink_grant > ktime_get_seconds())
891                                 next_shrink = cli->cl_next_shrink_grant;
892                 } else {
893                         init_next_shrink = false;
894                         next_shrink = cli->cl_next_shrink_grant;
895                 }
896         }
897         mutex_unlock(&client_gtd.gtd_mutex);
898
899         if (client_gtd.gtd_stopped == 1)
900                 return;
901
902         if (next_shrink > ktime_get_seconds()) {
903                 time64_t delay = next_shrink - ktime_get_seconds();
904
905                 schedule_delayed_work(&work, cfs_time_seconds(delay));
906         } else {
907                 schedule_work(&work.work);
908         }
909 }
910
911 void osc_schedule_grant_work(void)
912 {
913         cancel_delayed_work_sync(&work);
914         schedule_work(&work.work);
915 }
916
917 /**
918  * Start grant thread for returing grant to server for idle clients.
919  */
920 static int osc_start_grant_work(void)
921 {
922         client_gtd.gtd_stopped = 0;
923         mutex_init(&client_gtd.gtd_mutex);
924         INIT_LIST_HEAD(&client_gtd.gtd_clients);
925
926         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
927         schedule_work(&work.work);
928
929         return 0;
930 }
931
932 static void osc_stop_grant_work(void)
933 {
934         client_gtd.gtd_stopped = 1;
935         cancel_delayed_work_sync(&work);
936 }
937
938 static void osc_add_grant_list(struct client_obd *client)
939 {
940         mutex_lock(&client_gtd.gtd_mutex);
941         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
942         mutex_unlock(&client_gtd.gtd_mutex);
943 }
944
945 static void osc_del_grant_list(struct client_obd *client)
946 {
947         if (list_empty(&client->cl_grant_chain))
948                 return;
949
950         mutex_lock(&client_gtd.gtd_mutex);
951         list_del_init(&client->cl_grant_chain);
952         mutex_unlock(&client_gtd.gtd_mutex);
953 }
954
955 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
956 {
957         /*
958          * ocd_grant is the total grant amount we're expect to hold: if we've
959          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
960          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
961          * dirty.
962          *
963          * race is tolerable here: if we're evicted, but imp_state already
964          * left EVICTED state, then cl_dirty_pages must be 0 already.
965          */
966         spin_lock(&cli->cl_loi_list_lock);
967         cli->cl_avail_grant = ocd->ocd_grant;
968         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
969                 cli->cl_avail_grant -= cli->cl_reserved_grant;
970                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
971                         cli->cl_avail_grant -= cli->cl_dirty_grant;
972                 else
973                         cli->cl_avail_grant -=
974                                         cli->cl_dirty_pages << PAGE_SHIFT;
975         }
976
977         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
978                 u64 size;
979                 int chunk_mask;
980
981                 /* overhead for each extent insertion */
982                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
983                 /* determine the appropriate chunk size used by osc_extent. */
984                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
985                                           ocd->ocd_grant_blkbits);
986                 /* max_pages_per_rpc must be chunk aligned */
987                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
988                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
989                                              ~chunk_mask) & chunk_mask;
990                 /* determine maximum extent size, in #pages */
991                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
992                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
993                 if (cli->cl_max_extent_pages == 0)
994                         cli->cl_max_extent_pages = 1;
995         } else {
996                 cli->cl_grant_extent_tax = 0;
997                 cli->cl_chunkbits = PAGE_SHIFT;
998                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
999         }
1000         spin_unlock(&cli->cl_loi_list_lock);
1001
1002         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1003                 "chunk bits: %d cl_max_extent_pages: %d\n",
1004                 cli_name(cli),
1005                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1006                 cli->cl_max_extent_pages);
1007
1008         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1009                 osc_add_grant_list(cli);
1010 }
1011 EXPORT_SYMBOL(osc_init_grant);
1012
1013 /* We assume that the reason this OSC got a short read is because it read
1014  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1015  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1016  * this stripe never got written at or beyond this stripe offset yet. */
1017 static void handle_short_read(int nob_read, size_t page_count,
1018                               struct brw_page **pga)
1019 {
1020         char *ptr;
1021         int i = 0;
1022
1023         /* skip bytes read OK */
1024         while (nob_read > 0) {
1025                 LASSERT (page_count > 0);
1026
1027                 if (pga[i]->count > nob_read) {
1028                         /* EOF inside this page */
1029                         ptr = kmap(pga[i]->pg) +
1030                                 (pga[i]->off & ~PAGE_MASK);
1031                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1032                         kunmap(pga[i]->pg);
1033                         page_count--;
1034                         i++;
1035                         break;
1036                 }
1037
1038                 nob_read -= pga[i]->count;
1039                 page_count--;
1040                 i++;
1041         }
1042
1043         /* zero remaining pages */
1044         while (page_count-- > 0) {
1045                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1046                 memset(ptr, 0, pga[i]->count);
1047                 kunmap(pga[i]->pg);
1048                 i++;
1049         }
1050 }
1051
1052 static int check_write_rcs(struct ptlrpc_request *req,
1053                            int requested_nob, int niocount,
1054                            size_t page_count, struct brw_page **pga)
1055 {
1056         int     i;
1057         __u32   *remote_rcs;
1058
1059         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1060                                                   sizeof(*remote_rcs) *
1061                                                   niocount);
1062         if (remote_rcs == NULL) {
1063                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1064                 return(-EPROTO);
1065         }
1066
1067         /* return error if any niobuf was in error */
1068         for (i = 0; i < niocount; i++) {
1069                 if ((int)remote_rcs[i] < 0) {
1070                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1071                                i, remote_rcs[i], req);
1072                         return remote_rcs[i];
1073                 }
1074
1075                 if (remote_rcs[i] != 0) {
1076                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1077                                 i, remote_rcs[i], req);
1078                         return(-EPROTO);
1079                 }
1080         }
1081         if (req->rq_bulk != NULL &&
1082             req->rq_bulk->bd_nob_transferred != requested_nob) {
1083                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1084                        req->rq_bulk->bd_nob_transferred, requested_nob);
1085                 return(-EPROTO);
1086         }
1087
1088         return (0);
1089 }
1090
1091 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1092 {
1093         if (p1->flag != p2->flag) {
1094                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1095                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1096                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1097
1098                 /* warn if we try to combine flags that we don't know to be
1099                  * safe to combine */
1100                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1101                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1102                               "report this at https://jira.whamcloud.com/\n",
1103                               p1->flag, p2->flag);
1104                 }
1105                 return 0;
1106         }
1107
1108         return (p1->off + p1->count == p2->off);
1109 }
1110
1111 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1112 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1113                                    size_t pg_count, struct brw_page **pga,
1114                                    int opc, obd_dif_csum_fn *fn,
1115                                    int sector_size,
1116                                    u32 *check_sum)
1117 {
1118         struct ahash_request *req;
1119         /* Used Adler as the default checksum type on top of DIF tags */
1120         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1121         struct page *__page;
1122         unsigned char *buffer;
1123         __u16 *guard_start;
1124         unsigned int bufsize;
1125         int guard_number;
1126         int used_number = 0;
1127         int used;
1128         u32 cksum;
1129         int rc = 0;
1130         int i = 0;
1131
1132         LASSERT(pg_count > 0);
1133
1134         __page = alloc_page(GFP_KERNEL);
1135         if (__page == NULL)
1136                 return -ENOMEM;
1137
1138         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1139         if (IS_ERR(req)) {
1140                 rc = PTR_ERR(req);
1141                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1142                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1143                 GOTO(out, rc);
1144         }
1145
1146         buffer = kmap(__page);
1147         guard_start = (__u16 *)buffer;
1148         guard_number = PAGE_SIZE / sizeof(*guard_start);
1149         while (nob > 0 && pg_count > 0) {
1150                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1151
1152                 /* corrupt the data before we compute the checksum, to
1153                  * simulate an OST->client data error */
1154                 if (unlikely(i == 0 && opc == OST_READ &&
1155                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1156                         unsigned char *ptr = kmap(pga[i]->pg);
1157                         int off = pga[i]->off & ~PAGE_MASK;
1158
1159                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1160                         kunmap(pga[i]->pg);
1161                 }
1162
1163                 /*
1164                  * The left guard number should be able to hold checksums of a
1165                  * whole page
1166                  */
1167                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1168                                                   pga[i]->off & ~PAGE_MASK,
1169                                                   count,
1170                                                   guard_start + used_number,
1171                                                   guard_number - used_number,
1172                                                   &used, sector_size,
1173                                                   fn);
1174                 if (rc)
1175                         break;
1176
1177                 used_number += used;
1178                 if (used_number == guard_number) {
1179                         cfs_crypto_hash_update_page(req, __page, 0,
1180                                 used_number * sizeof(*guard_start));
1181                         used_number = 0;
1182                 }
1183
1184                 nob -= pga[i]->count;
1185                 pg_count--;
1186                 i++;
1187         }
1188         kunmap(__page);
1189         if (rc)
1190                 GOTO(out, rc);
1191
1192         if (used_number != 0)
1193                 cfs_crypto_hash_update_page(req, __page, 0,
1194                         used_number * sizeof(*guard_start));
1195
1196         bufsize = sizeof(cksum);
1197         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1198
1199         /* For sending we only compute the wrong checksum instead
1200          * of corrupting the data so it is still correct on a redo */
1201         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1202                 cksum++;
1203
1204         *check_sum = cksum;
1205 out:
1206         __free_page(__page);
1207         return rc;
1208 }
1209 #else /* !CONFIG_CRC_T10DIF */
1210 #define obd_dif_ip_fn NULL
1211 #define obd_dif_crc_fn NULL
1212 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1213         -EOPNOTSUPP
1214 #endif /* CONFIG_CRC_T10DIF */
1215
1216 static int osc_checksum_bulk(int nob, size_t pg_count,
1217                              struct brw_page **pga, int opc,
1218                              enum cksum_types cksum_type,
1219                              u32 *cksum)
1220 {
1221         int                             i = 0;
1222         struct ahash_request           *req;
1223         unsigned int                    bufsize;
1224         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1225
1226         LASSERT(pg_count > 0);
1227
1228         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1229         if (IS_ERR(req)) {
1230                 CERROR("Unable to initialize checksum hash %s\n",
1231                        cfs_crypto_hash_name(cfs_alg));
1232                 return PTR_ERR(req);
1233         }
1234
1235         while (nob > 0 && pg_count > 0) {
1236                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1237
1238                 /* corrupt the data before we compute the checksum, to
1239                  * simulate an OST->client data error */
1240                 if (i == 0 && opc == OST_READ &&
1241                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1242                         unsigned char *ptr = kmap(pga[i]->pg);
1243                         int off = pga[i]->off & ~PAGE_MASK;
1244
1245                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1246                         kunmap(pga[i]->pg);
1247                 }
1248                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1249                                             pga[i]->off & ~PAGE_MASK,
1250                                             count);
1251                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1252                                (int)(pga[i]->off & ~PAGE_MASK));
1253
1254                 nob -= pga[i]->count;
1255                 pg_count--;
1256                 i++;
1257         }
1258
1259         bufsize = sizeof(*cksum);
1260         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1261
1262         /* For sending we only compute the wrong checksum instead
1263          * of corrupting the data so it is still correct on a redo */
1264         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1265                 (*cksum)++;
1266
1267         return 0;
1268 }
1269
1270 static int osc_checksum_bulk_rw(const char *obd_name,
1271                                 enum cksum_types cksum_type,
1272                                 int nob, size_t pg_count,
1273                                 struct brw_page **pga, int opc,
1274                                 u32 *check_sum)
1275 {
1276         obd_dif_csum_fn *fn = NULL;
1277         int sector_size = 0;
1278         int rc;
1279
1280         ENTRY;
1281         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1282
1283         if (fn)
1284                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1285                                              opc, fn, sector_size, check_sum);
1286         else
1287                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1288                                        check_sum);
1289
1290         RETURN(rc);
1291 }
1292
1293 static int
1294 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1295                      u32 page_count, struct brw_page **pga,
1296                      struct ptlrpc_request **reqp, int resend)
1297 {
1298         struct ptlrpc_request   *req;
1299         struct ptlrpc_bulk_desc *desc;
1300         struct ost_body         *body;
1301         struct obd_ioobj        *ioobj;
1302         struct niobuf_remote    *niobuf;
1303         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1304         struct osc_brw_async_args *aa;
1305         struct req_capsule      *pill;
1306         struct brw_page *pg_prev;
1307         void *short_io_buf;
1308         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1309
1310         ENTRY;
1311         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1312                 RETURN(-ENOMEM); /* Recoverable */
1313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1314                 RETURN(-EINVAL); /* Fatal */
1315
1316         if ((cmd & OBD_BRW_WRITE) != 0) {
1317                 opc = OST_WRITE;
1318                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1319                                                 osc_rq_pool,
1320                                                 &RQF_OST_BRW_WRITE);
1321         } else {
1322                 opc = OST_READ;
1323                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1324         }
1325         if (req == NULL)
1326                 RETURN(-ENOMEM);
1327
1328         for (niocount = i = 1; i < page_count; i++) {
1329                 if (!can_merge_pages(pga[i - 1], pga[i]))
1330                         niocount++;
1331         }
1332
1333         pill = &req->rq_pill;
1334         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1335                              sizeof(*ioobj));
1336         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1337                              niocount * sizeof(*niobuf));
1338
1339         for (i = 0; i < page_count; i++)
1340                 short_io_size += pga[i]->count;
1341
1342         /* Check if read/write is small enough to be a short io. */
1343         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1344             !imp_connect_shortio(cli->cl_import))
1345                 short_io_size = 0;
1346
1347         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1348                              opc == OST_READ ? 0 : short_io_size);
1349         if (opc == OST_READ)
1350                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1351                                      short_io_size);
1352
1353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1354         if (rc) {
1355                 ptlrpc_request_free(req);
1356                 RETURN(rc);
1357         }
1358         osc_set_io_portal(req);
1359
1360         ptlrpc_at_set_req_timeout(req);
1361         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1362          * retry logic */
1363         req->rq_no_retry_einprogress = 1;
1364
1365         if (short_io_size != 0) {
1366                 desc = NULL;
1367                 short_io_buf = NULL;
1368                 goto no_bulk;
1369         }
1370
1371         desc = ptlrpc_prep_bulk_imp(req, page_count,
1372                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1373                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1374                         PTLRPC_BULK_PUT_SINK) |
1375                         PTLRPC_BULK_BUF_KIOV,
1376                 OST_BULK_PORTAL,
1377                 &ptlrpc_bulk_kiov_pin_ops);
1378
1379         if (desc == NULL)
1380                 GOTO(out, rc = -ENOMEM);
1381         /* NB request now owns desc and will free it when it gets freed */
1382 no_bulk:
1383         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1384         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1385         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1386         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1387
1388         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1389
1390         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1391          * and from_kgid(), because they are asynchronous. Fortunately, variable
1392          * oa contains valid o_uid and o_gid in these two operations.
1393          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1394          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1395          * other process logic */
1396         body->oa.o_uid = oa->o_uid;
1397         body->oa.o_gid = oa->o_gid;
1398
1399         obdo_to_ioobj(oa, ioobj);
1400         ioobj->ioo_bufcnt = niocount;
1401         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1402          * that might be send for this request.  The actual number is decided
1403          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1404          * "max - 1" for old client compatibility sending "0", and also so the
1405          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1406         if (desc != NULL)
1407                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1408         else /* short io */
1409                 ioobj_max_brw_set(ioobj, 0);
1410
1411         if (short_io_size != 0) {
1412                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1413                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1414                         body->oa.o_flags = 0;
1415                 }
1416                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1417                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1418                        short_io_size);
1419                 if (opc == OST_WRITE) {
1420                         short_io_buf = req_capsule_client_get(pill,
1421                                                               &RMF_SHORT_IO);
1422                         LASSERT(short_io_buf != NULL);
1423                 }
1424         }
1425
1426         LASSERT(page_count > 0);
1427         pg_prev = pga[0];
1428         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1429                 struct brw_page *pg = pga[i];
1430                 int poff = pg->off & ~PAGE_MASK;
1431
1432                 LASSERT(pg->count > 0);
1433                 /* make sure there is no gap in the middle of page array */
1434                 LASSERTF(page_count == 1 ||
1435                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1436                           ergo(i > 0 && i < page_count - 1,
1437                                poff == 0 && pg->count == PAGE_SIZE)   &&
1438                           ergo(i == page_count - 1, poff == 0)),
1439                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1440                          i, page_count, pg, pg->off, pg->count);
1441                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1442                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1443                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1444                          i, page_count,
1445                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1446                          pg_prev->pg, page_private(pg_prev->pg),
1447                          pg_prev->pg->index, pg_prev->off);
1448                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1449                         (pg->flag & OBD_BRW_SRVLOCK));
1450                 if (short_io_size != 0 && opc == OST_WRITE) {
1451                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1452
1453                         LASSERT(short_io_size >= requested_nob + pg->count);
1454                         memcpy(short_io_buf + requested_nob,
1455                                ptr + poff,
1456                                pg->count);
1457                         ll_kunmap_atomic(ptr, KM_USER0);
1458                 } else if (short_io_size == 0) {
1459                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1460                                                          pg->count);
1461                 }
1462                 requested_nob += pg->count;
1463
1464                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1465                         niobuf--;
1466                         niobuf->rnb_len += pg->count;
1467                 } else {
1468                         niobuf->rnb_offset = pg->off;
1469                         niobuf->rnb_len    = pg->count;
1470                         niobuf->rnb_flags  = pg->flag;
1471                 }
1472                 pg_prev = pg;
1473         }
1474
1475         LASSERTF((void *)(niobuf - niocount) ==
1476                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1477                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1478                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1479
1480         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1481         if (resend) {
1482                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1483                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1484                         body->oa.o_flags = 0;
1485                 }
1486                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1487         }
1488
1489         if (osc_should_shrink_grant(cli))
1490                 osc_shrink_grant_local(cli, &body->oa);
1491
1492         /* size[REQ_REC_OFF] still sizeof (*body) */
1493         if (opc == OST_WRITE) {
1494                 if (cli->cl_checksum &&
1495                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1496                         /* store cl_cksum_type in a local variable since
1497                          * it can be changed via lprocfs */
1498                         enum cksum_types cksum_type = cli->cl_cksum_type;
1499
1500                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1501                                 body->oa.o_flags = 0;
1502
1503                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1504                                                                 cksum_type);
1505                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1506
1507                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1508                                                   requested_nob, page_count,
1509                                                   pga, OST_WRITE,
1510                                                   &body->oa.o_cksum);
1511                         if (rc < 0) {
1512                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1513                                        rc);
1514                                 GOTO(out, rc);
1515                         }
1516                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1517                                body->oa.o_cksum);
1518
1519                         /* save this in 'oa', too, for later checking */
1520                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1521                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1522                                                            cksum_type);
1523                 } else {
1524                         /* clear out the checksum flag, in case this is a
1525                          * resend but cl_checksum is no longer set. b=11238 */
1526                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1527                 }
1528                 oa->o_cksum = body->oa.o_cksum;
1529                 /* 1 RC per niobuf */
1530                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1531                                      sizeof(__u32) * niocount);
1532         } else {
1533                 if (cli->cl_checksum &&
1534                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1535                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1536                                 body->oa.o_flags = 0;
1537                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1538                                 cli->cl_cksum_type);
1539                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1540                 }
1541
1542                 /* Client cksum has been already copied to wire obdo in previous
1543                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1544                  * resent due to cksum error, this will allow Server to
1545                  * check+dump pages on its side */
1546         }
1547         ptlrpc_request_set_replen(req);
1548
1549         aa = ptlrpc_req_async_args(aa, req);
1550         aa->aa_oa = oa;
1551         aa->aa_requested_nob = requested_nob;
1552         aa->aa_nio_count = niocount;
1553         aa->aa_page_count = page_count;
1554         aa->aa_resends = 0;
1555         aa->aa_ppga = pga;
1556         aa->aa_cli = cli;
1557         INIT_LIST_HEAD(&aa->aa_oaps);
1558
1559         *reqp = req;
1560         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1561         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1562                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1563                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1564         RETURN(0);
1565
1566  out:
1567         ptlrpc_req_finished(req);
1568         RETURN(rc);
1569 }
1570
1571 char dbgcksum_file_name[PATH_MAX];
1572
1573 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1574                                 struct brw_page **pga, __u32 server_cksum,
1575                                 __u32 client_cksum)
1576 {
1577         struct file *filp;
1578         int rc, i;
1579         unsigned int len;
1580         char *buf;
1581
1582         /* will only keep dump of pages on first error for the same range in
1583          * file/fid, not during the resends/retries. */
1584         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1585                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1586                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1587                   libcfs_debug_file_path_arr :
1588                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1589                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1590                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1591                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1592                  pga[0]->off,
1593                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1594                  client_cksum, server_cksum);
1595         filp = filp_open(dbgcksum_file_name,
1596                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1597         if (IS_ERR(filp)) {
1598                 rc = PTR_ERR(filp);
1599                 if (rc == -EEXIST)
1600                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1601                                "checksum error: rc = %d\n", dbgcksum_file_name,
1602                                rc);
1603                 else
1604                         CERROR("%s: can't open to dump pages with checksum "
1605                                "error: rc = %d\n", dbgcksum_file_name, rc);
1606                 return;
1607         }
1608
1609         for (i = 0; i < page_count; i++) {
1610                 len = pga[i]->count;
1611                 buf = kmap(pga[i]->pg);
1612                 while (len != 0) {
1613                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1614                         if (rc < 0) {
1615                                 CERROR("%s: wanted to write %u but got %d "
1616                                        "error\n", dbgcksum_file_name, len, rc);
1617                                 break;
1618                         }
1619                         len -= rc;
1620                         buf += rc;
1621                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1622                                dbgcksum_file_name, rc);
1623                 }
1624                 kunmap(pga[i]->pg);
1625         }
1626
1627         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1628         if (rc)
1629                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1630         filp_close(filp, NULL);
1631         return;
1632 }
1633
1634 static int
1635 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1636                      __u32 client_cksum, __u32 server_cksum,
1637                      struct osc_brw_async_args *aa)
1638 {
1639         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1640         enum cksum_types cksum_type;
1641         obd_dif_csum_fn *fn = NULL;
1642         int sector_size = 0;
1643         __u32 new_cksum;
1644         char *msg;
1645         int rc;
1646
1647         if (server_cksum == client_cksum) {
1648                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1649                 return 0;
1650         }
1651
1652         if (aa->aa_cli->cl_checksum_dump)
1653                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1654                                     server_cksum, client_cksum);
1655
1656         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1657                                            oa->o_flags : 0);
1658
1659         switch (cksum_type) {
1660         case OBD_CKSUM_T10IP512:
1661                 fn = obd_dif_ip_fn;
1662                 sector_size = 512;
1663                 break;
1664         case OBD_CKSUM_T10IP4K:
1665                 fn = obd_dif_ip_fn;
1666                 sector_size = 4096;
1667                 break;
1668         case OBD_CKSUM_T10CRC512:
1669                 fn = obd_dif_crc_fn;
1670                 sector_size = 512;
1671                 break;
1672         case OBD_CKSUM_T10CRC4K:
1673                 fn = obd_dif_crc_fn;
1674                 sector_size = 4096;
1675                 break;
1676         default:
1677                 break;
1678         }
1679
1680         if (fn)
1681                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1682                                              aa->aa_page_count, aa->aa_ppga,
1683                                              OST_WRITE, fn, sector_size,
1684                                              &new_cksum);
1685         else
1686                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1687                                        aa->aa_ppga, OST_WRITE, cksum_type,
1688                                        &new_cksum);
1689
1690         if (rc < 0)
1691                 msg = "failed to calculate the client write checksum";
1692         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1693                 msg = "the server did not use the checksum type specified in "
1694                       "the original request - likely a protocol problem";
1695         else if (new_cksum == server_cksum)
1696                 msg = "changed on the client after we checksummed it - "
1697                       "likely false positive due to mmap IO (bug 11742)";
1698         else if (new_cksum == client_cksum)
1699                 msg = "changed in transit before arrival at OST";
1700         else
1701                 msg = "changed in transit AND doesn't match the original - "
1702                       "likely false positive due to mmap IO (bug 11742)";
1703
1704         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1705                            DFID " object "DOSTID" extent [%llu-%llu], original "
1706                            "client csum %x (type %x), server csum %x (type %x),"
1707                            " client csum now %x\n",
1708                            obd_name, msg, libcfs_nid2str(peer->nid),
1709                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1710                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1711                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1712                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1713                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1714                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1715                            client_cksum,
1716                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1717                            server_cksum, cksum_type, new_cksum);
1718         return 1;
1719 }
1720
1721 /* Note rc enters this function as number of bytes transferred */
1722 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1723 {
1724         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1725         struct client_obd *cli = aa->aa_cli;
1726         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1727         const struct lnet_process_id *peer =
1728                 &req->rq_import->imp_connection->c_peer;
1729         struct ost_body *body;
1730         u32 client_cksum = 0;
1731
1732         ENTRY;
1733
1734         if (rc < 0 && rc != -EDQUOT) {
1735                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1736                 RETURN(rc);
1737         }
1738
1739         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1740         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1741         if (body == NULL) {
1742                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1743                 RETURN(-EPROTO);
1744         }
1745
1746         /* set/clear over quota flag for a uid/gid/projid */
1747         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1748             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1749                 unsigned qid[LL_MAXQUOTAS] = {
1750                                          body->oa.o_uid, body->oa.o_gid,
1751                                          body->oa.o_projid };
1752                 CDEBUG(D_QUOTA,
1753                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1754                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1755                        body->oa.o_valid, body->oa.o_flags);
1756                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1757                                        body->oa.o_flags);
1758         }
1759
1760         osc_update_grant(cli, body);
1761
1762         if (rc < 0)
1763                 RETURN(rc);
1764
1765         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1766                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1767
1768         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1769                 if (rc > 0) {
1770                         CERROR("%s: unexpected positive size %d\n",
1771                                obd_name, rc);
1772                         RETURN(-EPROTO);
1773                 }
1774
1775                 if (req->rq_bulk != NULL &&
1776                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1777                         RETURN(-EAGAIN);
1778
1779                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780                     check_write_checksum(&body->oa, peer, client_cksum,
1781                                          body->oa.o_cksum, aa))
1782                         RETURN(-EAGAIN);
1783
1784                 rc = check_write_rcs(req, aa->aa_requested_nob,
1785                                      aa->aa_nio_count, aa->aa_page_count,
1786                                      aa->aa_ppga);
1787                 GOTO(out, rc);
1788         }
1789
1790         /* The rest of this function executes only for OST_READs */
1791
1792         if (req->rq_bulk == NULL) {
1793                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1794                                           RCL_SERVER);
1795                 LASSERT(rc == req->rq_status);
1796         } else {
1797                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1798                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1799         }
1800         if (rc < 0)
1801                 GOTO(out, rc = -EAGAIN);
1802
1803         if (rc > aa->aa_requested_nob) {
1804                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1805                        rc, aa->aa_requested_nob);
1806                 RETURN(-EPROTO);
1807         }
1808
1809         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1810                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1811                        rc, req->rq_bulk->bd_nob_transferred);
1812                 RETURN(-EPROTO);
1813         }
1814
1815         if (req->rq_bulk == NULL) {
1816                 /* short io */
1817                 int nob, pg_count, i = 0;
1818                 unsigned char *buf;
1819
1820                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1821                 pg_count = aa->aa_page_count;
1822                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1823                                                    rc);
1824                 nob = rc;
1825                 while (nob > 0 && pg_count > 0) {
1826                         unsigned char *ptr;
1827                         int count = aa->aa_ppga[i]->count > nob ?
1828                                     nob : aa->aa_ppga[i]->count;
1829
1830                         CDEBUG(D_CACHE, "page %p count %d\n",
1831                                aa->aa_ppga[i]->pg, count);
1832                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1833                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1834                                count);
1835                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1836
1837                         buf += count;
1838                         nob -= count;
1839                         i++;
1840                         pg_count--;
1841                 }
1842         }
1843
1844         if (rc < aa->aa_requested_nob)
1845                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1846
1847         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1848                 static int cksum_counter;
1849                 u32        server_cksum = body->oa.o_cksum;
1850                 char      *via = "";
1851                 char      *router = "";
1852                 enum cksum_types cksum_type;
1853                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1854                         body->oa.o_flags : 0;
1855
1856                 cksum_type = obd_cksum_type_unpack(o_flags);
1857                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1858                                           aa->aa_page_count, aa->aa_ppga,
1859                                           OST_READ, &client_cksum);
1860                 if (rc < 0)
1861                         GOTO(out, rc);
1862
1863                 if (req->rq_bulk != NULL &&
1864                     peer->nid != req->rq_bulk->bd_sender) {
1865                         via = " via ";
1866                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1867                 }
1868
1869                 if (server_cksum != client_cksum) {
1870                         struct ost_body *clbody;
1871                         u32 page_count = aa->aa_page_count;
1872
1873                         clbody = req_capsule_client_get(&req->rq_pill,
1874                                                         &RMF_OST_BODY);
1875                         if (cli->cl_checksum_dump)
1876                                 dump_all_bulk_pages(&clbody->oa, page_count,
1877                                                     aa->aa_ppga, server_cksum,
1878                                                     client_cksum);
1879
1880                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1881                                            "%s%s%s inode "DFID" object "DOSTID
1882                                            " extent [%llu-%llu], client %x, "
1883                                            "server %x, cksum_type %x\n",
1884                                            obd_name,
1885                                            libcfs_nid2str(peer->nid),
1886                                            via, router,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_seq : 0ULL,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_oid : 0,
1891                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1892                                                 clbody->oa.o_parent_ver : 0,
1893                                            POSTID(&body->oa.o_oi),
1894                                            aa->aa_ppga[0]->off,
1895                                            aa->aa_ppga[page_count-1]->off +
1896                                            aa->aa_ppga[page_count-1]->count - 1,
1897                                            client_cksum, server_cksum,
1898                                            cksum_type);
1899                         cksum_counter = 0;
1900                         aa->aa_oa->o_cksum = client_cksum;
1901                         rc = -EAGAIN;
1902                 } else {
1903                         cksum_counter++;
1904                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1905                         rc = 0;
1906                 }
1907         } else if (unlikely(client_cksum)) {
1908                 static int cksum_missed;
1909
1910                 cksum_missed++;
1911                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1912                         CERROR("%s: checksum %u requested from %s but not sent\n",
1913                                obd_name, cksum_missed,
1914                                libcfs_nid2str(peer->nid));
1915         } else {
1916                 rc = 0;
1917         }
1918 out:
1919         if (rc >= 0)
1920                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1921                                      aa->aa_oa, &body->oa);
1922
1923         RETURN(rc);
1924 }
1925
1926 static int osc_brw_redo_request(struct ptlrpc_request *request,
1927                                 struct osc_brw_async_args *aa, int rc)
1928 {
1929         struct ptlrpc_request *new_req;
1930         struct osc_brw_async_args *new_aa;
1931         struct osc_async_page *oap;
1932         ENTRY;
1933
1934         /* The below message is checked in replay-ost-single.sh test_8ae*/
1935         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1936                   "redo for recoverable error %d", rc);
1937
1938         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1939                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1940                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1941                                   aa->aa_ppga, &new_req, 1);
1942         if (rc)
1943                 RETURN(rc);
1944
1945         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1946                 if (oap->oap_request != NULL) {
1947                         LASSERTF(request == oap->oap_request,
1948                                  "request %p != oap_request %p\n",
1949                                  request, oap->oap_request);
1950                         if (oap->oap_interrupted) {
1951                                 ptlrpc_req_finished(new_req);
1952                                 RETURN(-EINTR);
1953                         }
1954                 }
1955         }
1956         /*
1957          * New request takes over pga and oaps from old request.
1958          * Note that copying a list_head doesn't work, need to move it...
1959          */
1960         aa->aa_resends++;
1961         new_req->rq_interpret_reply = request->rq_interpret_reply;
1962         new_req->rq_async_args = request->rq_async_args;
1963         new_req->rq_commit_cb = request->rq_commit_cb;
1964         /* cap resend delay to the current request timeout, this is similar to
1965          * what ptlrpc does (see after_reply()) */
1966         if (aa->aa_resends > new_req->rq_timeout)
1967                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1968         else
1969                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1970         new_req->rq_generation_set = 1;
1971         new_req->rq_import_generation = request->rq_import_generation;
1972
1973         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1974
1975         INIT_LIST_HEAD(&new_aa->aa_oaps);
1976         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1977         INIT_LIST_HEAD(&new_aa->aa_exts);
1978         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1979         new_aa->aa_resends = aa->aa_resends;
1980
1981         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1982                 if (oap->oap_request) {
1983                         ptlrpc_req_finished(oap->oap_request);
1984                         oap->oap_request = ptlrpc_request_addref(new_req);
1985                 }
1986         }
1987
1988         /* XXX: This code will run into problem if we're going to support
1989          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1990          * and wait for all of them to be finished. We should inherit request
1991          * set from old request. */
1992         ptlrpcd_add_req(new_req);
1993
1994         DEBUG_REQ(D_INFO, new_req, "new request");
1995         RETURN(0);
1996 }
1997
1998 /*
1999  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2000  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2001  * fine for our small page arrays and doesn't require allocation.  its an
2002  * insertion sort that swaps elements that are strides apart, shrinking the
2003  * stride down until its '1' and the array is sorted.
2004  */
2005 static void sort_brw_pages(struct brw_page **array, int num)
2006 {
2007         int stride, i, j;
2008         struct brw_page *tmp;
2009
2010         if (num == 1)
2011                 return;
2012         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2013                 ;
2014
2015         do {
2016                 stride /= 3;
2017                 for (i = stride ; i < num ; i++) {
2018                         tmp = array[i];
2019                         j = i;
2020                         while (j >= stride && array[j - stride]->off > tmp->off) {
2021                                 array[j] = array[j - stride];
2022                                 j -= stride;
2023                         }
2024                         array[j] = tmp;
2025                 }
2026         } while (stride > 1);
2027 }
2028
2029 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2030 {
2031         LASSERT(ppga != NULL);
2032         OBD_FREE(ppga, sizeof(*ppga) * count);
2033 }
2034
2035 static int brw_interpret(const struct lu_env *env,
2036                          struct ptlrpc_request *req, void *args, int rc)
2037 {
2038         struct osc_brw_async_args *aa = args;
2039         struct osc_extent *ext;
2040         struct osc_extent *tmp;
2041         struct client_obd *cli = aa->aa_cli;
2042         unsigned long transferred = 0;
2043
2044         ENTRY;
2045
2046         rc = osc_brw_fini_request(req, rc);
2047         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2048         /*
2049          * When server returns -EINPROGRESS, client should always retry
2050          * regardless of the number of times the bulk was resent already.
2051          */
2052         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2053                 if (req->rq_import_generation !=
2054                     req->rq_import->imp_generation) {
2055                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2056                                ""DOSTID", rc = %d.\n",
2057                                req->rq_import->imp_obd->obd_name,
2058                                POSTID(&aa->aa_oa->o_oi), rc);
2059                 } else if (rc == -EINPROGRESS ||
2060                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2061                         rc = osc_brw_redo_request(req, aa, rc);
2062                 } else {
2063                         CERROR("%s: too many resent retries for object: "
2064                                "%llu:%llu, rc = %d.\n",
2065                                req->rq_import->imp_obd->obd_name,
2066                                POSTID(&aa->aa_oa->o_oi), rc);
2067                 }
2068
2069                 if (rc == 0)
2070                         RETURN(0);
2071                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2072                         rc = -EIO;
2073         }
2074
2075         if (rc == 0) {
2076                 struct obdo *oa = aa->aa_oa;
2077                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2078                 unsigned long valid = 0;
2079                 struct cl_object *obj;
2080                 struct osc_async_page *last;
2081
2082                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2083                 obj = osc2cl(last->oap_obj);
2084
2085                 cl_object_attr_lock(obj);
2086                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2087                         attr->cat_blocks = oa->o_blocks;
2088                         valid |= CAT_BLOCKS;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLMTIME) {
2091                         attr->cat_mtime = oa->o_mtime;
2092                         valid |= CAT_MTIME;
2093                 }
2094                 if (oa->o_valid & OBD_MD_FLATIME) {
2095                         attr->cat_atime = oa->o_atime;
2096                         valid |= CAT_ATIME;
2097                 }
2098                 if (oa->o_valid & OBD_MD_FLCTIME) {
2099                         attr->cat_ctime = oa->o_ctime;
2100                         valid |= CAT_CTIME;
2101                 }
2102
2103                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2104                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2105                         loff_t last_off = last->oap_count + last->oap_obj_off +
2106                                 last->oap_page_off;
2107
2108                         /* Change file size if this is an out of quota or
2109                          * direct IO write and it extends the file size */
2110                         if (loi->loi_lvb.lvb_size < last_off) {
2111                                 attr->cat_size = last_off;
2112                                 valid |= CAT_SIZE;
2113                         }
2114                         /* Extend KMS if it's not a lockless write */
2115                         if (loi->loi_kms < last_off &&
2116                             oap2osc_page(last)->ops_srvlock == 0) {
2117                                 attr->cat_kms = last_off;
2118                                 valid |= CAT_KMS;
2119                         }
2120                 }
2121
2122                 if (valid != 0)
2123                         cl_object_attr_update(env, obj, attr, valid);
2124                 cl_object_attr_unlock(obj);
2125         }
2126         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2127         aa->aa_oa = NULL;
2128
2129         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2130                 osc_inc_unstable_pages(req);
2131
2132         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2133                 list_del_init(&ext->oe_link);
2134                 osc_extent_finish(env, ext, 1,
2135                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2136         }
2137         LASSERT(list_empty(&aa->aa_exts));
2138         LASSERT(list_empty(&aa->aa_oaps));
2139
2140         transferred = (req->rq_bulk == NULL ? /* short io */
2141                        aa->aa_requested_nob :
2142                        req->rq_bulk->bd_nob_transferred);
2143
2144         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2145         ptlrpc_lprocfs_brw(req, transferred);
2146
2147         spin_lock(&cli->cl_loi_list_lock);
2148         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2149          * is called so we know whether to go to sync BRWs or wait for more
2150          * RPCs to complete */
2151         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2152                 cli->cl_w_in_flight--;
2153         else
2154                 cli->cl_r_in_flight--;
2155         osc_wake_cache_waiters(cli);
2156         spin_unlock(&cli->cl_loi_list_lock);
2157
2158         osc_io_unplug(env, cli, NULL);
2159         RETURN(rc);
2160 }
2161
2162 static void brw_commit(struct ptlrpc_request *req)
2163 {
2164         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2165          * this called via the rq_commit_cb, I need to ensure
2166          * osc_dec_unstable_pages is still called. Otherwise unstable
2167          * pages may be leaked. */
2168         spin_lock(&req->rq_lock);
2169         if (likely(req->rq_unstable)) {
2170                 req->rq_unstable = 0;
2171                 spin_unlock(&req->rq_lock);
2172
2173                 osc_dec_unstable_pages(req);
2174         } else {
2175                 req->rq_committed = 1;
2176                 spin_unlock(&req->rq_lock);
2177         }
2178 }
2179
2180 /**
2181  * Build an RPC by the list of extent @ext_list. The caller must ensure
2182  * that the total pages in this list are NOT over max pages per RPC.
2183  * Extents in the list must be in OES_RPC state.
2184  */
2185 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2186                   struct list_head *ext_list, int cmd)
2187 {
2188         struct ptlrpc_request           *req = NULL;
2189         struct osc_extent               *ext;
2190         struct brw_page                 **pga = NULL;
2191         struct osc_brw_async_args       *aa = NULL;
2192         struct obdo                     *oa = NULL;
2193         struct osc_async_page           *oap;
2194         struct osc_object               *obj = NULL;
2195         struct cl_req_attr              *crattr = NULL;
2196         loff_t                          starting_offset = OBD_OBJECT_EOF;
2197         loff_t                          ending_offset = 0;
2198         int                             mpflag = 0;
2199         int                             mem_tight = 0;
2200         int                             page_count = 0;
2201         bool                            soft_sync = false;
2202         bool                            interrupted = false;
2203         bool                            ndelay = false;
2204         int                             i;
2205         int                             grant = 0;
2206         int                             rc;
2207         __u32                           layout_version = 0;
2208         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2209         struct ost_body                 *body;
2210         ENTRY;
2211         LASSERT(!list_empty(ext_list));
2212
2213         /* add pages into rpc_list to build BRW rpc */
2214         list_for_each_entry(ext, ext_list, oe_link) {
2215                 LASSERT(ext->oe_state == OES_RPC);
2216                 mem_tight |= ext->oe_memalloc;
2217                 grant += ext->oe_grants;
2218                 page_count += ext->oe_nr_pages;
2219                 layout_version = MAX(layout_version, ext->oe_layout_version);
2220                 if (obj == NULL)
2221                         obj = ext->oe_obj;
2222         }
2223
2224         soft_sync = osc_over_unstable_soft_limit(cli);
2225         if (mem_tight)
2226                 mpflag = cfs_memory_pressure_get_and_set();
2227
2228         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2229         if (pga == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2233         if (oa == NULL)
2234                 GOTO(out, rc = -ENOMEM);
2235
2236         i = 0;
2237         list_for_each_entry(ext, ext_list, oe_link) {
2238                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2239                         if (mem_tight)
2240                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2241                         if (soft_sync)
2242                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2243                         pga[i] = &oap->oap_brw_page;
2244                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2245                         i++;
2246
2247                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2248                         if (starting_offset == OBD_OBJECT_EOF ||
2249                             starting_offset > oap->oap_obj_off)
2250                                 starting_offset = oap->oap_obj_off;
2251                         else
2252                                 LASSERT(oap->oap_page_off == 0);
2253                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2254                                 ending_offset = oap->oap_obj_off +
2255                                                 oap->oap_count;
2256                         else
2257                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2258                                         PAGE_SIZE);
2259                         if (oap->oap_interrupted)
2260                                 interrupted = true;
2261                 }
2262                 if (ext->oe_ndelay)
2263                         ndelay = true;
2264         }
2265
2266         /* first page in the list */
2267         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2268
2269         crattr = &osc_env_info(env)->oti_req_attr;
2270         memset(crattr, 0, sizeof(*crattr));
2271         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2272         crattr->cra_flags = ~0ULL;
2273         crattr->cra_page = oap2cl_page(oap);
2274         crattr->cra_oa = oa;
2275         cl_req_attr_set(env, osc2cl(obj), crattr);
2276
2277         if (cmd == OBD_BRW_WRITE) {
2278                 oa->o_grant_used = grant;
2279                 if (layout_version > 0) {
2280                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2281                                PFID(&oa->o_oi.oi_fid), layout_version);
2282
2283                         oa->o_layout_version = layout_version;
2284                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2285                 }
2286         }
2287
2288         sort_brw_pages(pga, page_count);
2289         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2290         if (rc != 0) {
2291                 CERROR("prep_req failed: %d\n", rc);
2292                 GOTO(out, rc);
2293         }
2294
2295         req->rq_commit_cb = brw_commit;
2296         req->rq_interpret_reply = brw_interpret;
2297         req->rq_memalloc = mem_tight != 0;
2298         oap->oap_request = ptlrpc_request_addref(req);
2299         if (interrupted && !req->rq_intr)
2300                 ptlrpc_mark_interrupted(req);
2301         if (ndelay) {
2302                 req->rq_no_resend = req->rq_no_delay = 1;
2303                 /* probably set a shorter timeout value.
2304                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2305                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2306         }
2307
2308         /* Need to update the timestamps after the request is built in case
2309          * we race with setattr (locally or in queue at OST).  If OST gets
2310          * later setattr before earlier BRW (as determined by the request xid),
2311          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2312          * way to do this in a single call.  bug 10150 */
2313         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2314         crattr->cra_oa = &body->oa;
2315         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2316         cl_req_attr_set(env, osc2cl(obj), crattr);
2317         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2318
2319         aa = ptlrpc_req_async_args(aa, req);
2320         INIT_LIST_HEAD(&aa->aa_oaps);
2321         list_splice_init(&rpc_list, &aa->aa_oaps);
2322         INIT_LIST_HEAD(&aa->aa_exts);
2323         list_splice_init(ext_list, &aa->aa_exts);
2324
2325         spin_lock(&cli->cl_loi_list_lock);
2326         starting_offset >>= PAGE_SHIFT;
2327         if (cmd == OBD_BRW_READ) {
2328                 cli->cl_r_in_flight++;
2329                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2330                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2331                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2332                                       starting_offset + 1);
2333         } else {
2334                 cli->cl_w_in_flight++;
2335                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2336                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2337                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2338                                       starting_offset + 1);
2339         }
2340         spin_unlock(&cli->cl_loi_list_lock);
2341
2342         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2343                   page_count, aa, cli->cl_r_in_flight,
2344                   cli->cl_w_in_flight);
2345         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2346
2347         ptlrpcd_add_req(req);
2348         rc = 0;
2349         EXIT;
2350
2351 out:
2352         if (mem_tight != 0)
2353                 cfs_memory_pressure_restore(mpflag);
2354
2355         if (rc != 0) {
2356                 LASSERT(req == NULL);
2357
2358                 if (oa)
2359                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2360                 if (pga)
2361                         OBD_FREE(pga, sizeof(*pga) * page_count);
2362                 /* this should happen rarely and is pretty bad, it makes the
2363                  * pending list not follow the dirty order */
2364                 while (!list_empty(ext_list)) {
2365                         ext = list_entry(ext_list->next, struct osc_extent,
2366                                          oe_link);
2367                         list_del_init(&ext->oe_link);
2368                         osc_extent_finish(env, ext, 0, rc);
2369                 }
2370         }
2371         RETURN(rc);
2372 }
2373
2374 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2375 {
2376         int set = 0;
2377
2378         LASSERT(lock != NULL);
2379
2380         lock_res_and_lock(lock);
2381
2382         if (lock->l_ast_data == NULL)
2383                 lock->l_ast_data = data;
2384         if (lock->l_ast_data == data)
2385                 set = 1;
2386
2387         unlock_res_and_lock(lock);
2388
2389         return set;
2390 }
2391
2392 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2393                      void *cookie, struct lustre_handle *lockh,
2394                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2395                      int errcode)
2396 {
2397         bool intent = *flags & LDLM_FL_HAS_INTENT;
2398         int rc;
2399         ENTRY;
2400
2401         /* The request was created before ldlm_cli_enqueue call. */
2402         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2403                 struct ldlm_reply *rep;
2404
2405                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2406                 LASSERT(rep != NULL);
2407
2408                 rep->lock_policy_res1 =
2409                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2410                 if (rep->lock_policy_res1)
2411                         errcode = rep->lock_policy_res1;
2412                 if (!speculative)
2413                         *flags |= LDLM_FL_LVB_READY;
2414         } else if (errcode == ELDLM_OK) {
2415                 *flags |= LDLM_FL_LVB_READY;
2416         }
2417
2418         /* Call the update callback. */
2419         rc = (*upcall)(cookie, lockh, errcode);
2420
2421         /* release the reference taken in ldlm_cli_enqueue() */
2422         if (errcode == ELDLM_LOCK_MATCHED)
2423                 errcode = ELDLM_OK;
2424         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2425                 ldlm_lock_decref(lockh, mode);
2426
2427         RETURN(rc);
2428 }
2429
2430 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2431                           void *args, int rc)
2432 {
2433         struct osc_enqueue_args *aa = args;
2434         struct ldlm_lock *lock;
2435         struct lustre_handle *lockh = &aa->oa_lockh;
2436         enum ldlm_mode mode = aa->oa_mode;
2437         struct ost_lvb *lvb = aa->oa_lvb;
2438         __u32 lvb_len = sizeof(*lvb);
2439         __u64 flags = 0;
2440
2441         ENTRY;
2442
2443         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2444          * be valid. */
2445         lock = ldlm_handle2lock(lockh);
2446         LASSERTF(lock != NULL,
2447                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2448                  lockh->cookie, req, aa);
2449
2450         /* Take an additional reference so that a blocking AST that
2451          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2452          * to arrive after an upcall has been executed by
2453          * osc_enqueue_fini(). */
2454         ldlm_lock_addref(lockh, mode);
2455
2456         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2457         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2458
2459         /* Let CP AST to grant the lock first. */
2460         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2461
2462         if (aa->oa_speculative) {
2463                 LASSERT(aa->oa_lvb == NULL);
2464                 LASSERT(aa->oa_flags == NULL);
2465                 aa->oa_flags = &flags;
2466         }
2467
2468         /* Complete obtaining the lock procedure. */
2469         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2470                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2471                                    lockh, rc);
2472         /* Complete osc stuff. */
2473         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2474                               aa->oa_flags, aa->oa_speculative, rc);
2475
2476         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2477
2478         ldlm_lock_decref(lockh, mode);
2479         LDLM_LOCK_PUT(lock);
2480         RETURN(rc);
2481 }
2482
2483 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2484
2485 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2486  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2487  * other synchronous requests, however keeping some locks and trying to obtain
2488  * others may take a considerable amount of time in a case of ost failure; and
2489  * when other sync requests do not get released lock from a client, the client
2490  * is evicted from the cluster -- such scenarious make the life difficult, so
2491  * release locks just after they are obtained. */
2492 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2493                      __u64 *flags, union ldlm_policy_data *policy,
2494                      struct ost_lvb *lvb, int kms_valid,
2495                      osc_enqueue_upcall_f upcall, void *cookie,
2496                      struct ldlm_enqueue_info *einfo,
2497                      struct ptlrpc_request_set *rqset, int async,
2498                      bool speculative)
2499 {
2500         struct obd_device *obd = exp->exp_obd;
2501         struct lustre_handle lockh = { 0 };
2502         struct ptlrpc_request *req = NULL;
2503         int intent = *flags & LDLM_FL_HAS_INTENT;
2504         __u64 match_flags = *flags;
2505         enum ldlm_mode mode;
2506         int rc;
2507         ENTRY;
2508
2509         /* Filesystem lock extents are extended to page boundaries so that
2510          * dealing with the page cache is a little smoother.  */
2511         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2512         policy->l_extent.end |= ~PAGE_MASK;
2513
2514         /*
2515          * kms is not valid when either object is completely fresh (so that no
2516          * locks are cached), or object was evicted. In the latter case cached
2517          * lock cannot be used, because it would prime inode state with
2518          * potentially stale LVB.
2519          */
2520         if (!kms_valid)
2521                 goto no_match;
2522
2523         /* Next, search for already existing extent locks that will cover us */
2524         /* If we're trying to read, we also search for an existing PW lock.  The
2525          * VFS and page cache already protect us locally, so lots of readers/
2526          * writers can share a single PW lock.
2527          *
2528          * There are problems with conversion deadlocks, so instead of
2529          * converting a read lock to a write lock, we'll just enqueue a new
2530          * one.
2531          *
2532          * At some point we should cancel the read lock instead of making them
2533          * send us a blocking callback, but there are problems with canceling
2534          * locks out from other users right now, too. */
2535         mode = einfo->ei_mode;
2536         if (einfo->ei_mode == LCK_PR)
2537                 mode |= LCK_PW;
2538         /* Normal lock requests must wait for the LVB to be ready before
2539          * matching a lock; speculative lock requests do not need to,
2540          * because they will not actually use the lock. */
2541         if (!speculative)
2542                 match_flags |= LDLM_FL_LVB_READY;
2543         if (intent != 0)
2544                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2545         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2546                                einfo->ei_type, policy, mode, &lockh, 0);
2547         if (mode) {
2548                 struct ldlm_lock *matched;
2549
2550                 if (*flags & LDLM_FL_TEST_LOCK)
2551                         RETURN(ELDLM_OK);
2552
2553                 matched = ldlm_handle2lock(&lockh);
2554                 if (speculative) {
2555                         /* This DLM lock request is speculative, and does not
2556                          * have an associated IO request. Therefore if there
2557                          * is already a DLM lock, it wll just inform the
2558                          * caller to cancel the request for this stripe.*/
2559                         lock_res_and_lock(matched);
2560                         if (ldlm_extent_equal(&policy->l_extent,
2561                             &matched->l_policy_data.l_extent))
2562                                 rc = -EEXIST;
2563                         else
2564                                 rc = -ECANCELED;
2565                         unlock_res_and_lock(matched);
2566
2567                         ldlm_lock_decref(&lockh, mode);
2568                         LDLM_LOCK_PUT(matched);
2569                         RETURN(rc);
2570                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2571                         *flags |= LDLM_FL_LVB_READY;
2572
2573                         /* We already have a lock, and it's referenced. */
2574                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2575
2576                         ldlm_lock_decref(&lockh, mode);
2577                         LDLM_LOCK_PUT(matched);
2578                         RETURN(ELDLM_OK);
2579                 } else {
2580                         ldlm_lock_decref(&lockh, mode);
2581                         LDLM_LOCK_PUT(matched);
2582                 }
2583         }
2584
2585 no_match:
2586         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2587                 RETURN(-ENOLCK);
2588
2589         if (intent) {
2590                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2591                                            &RQF_LDLM_ENQUEUE_LVB);
2592                 if (req == NULL)
2593                         RETURN(-ENOMEM);
2594
2595                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2596                 if (rc) {
2597                         ptlrpc_request_free(req);
2598                         RETURN(rc);
2599                 }
2600
2601                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2602                                      sizeof *lvb);
2603                 ptlrpc_request_set_replen(req);
2604         }
2605
2606         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2607         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2608
2609         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2610                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2611         if (async) {
2612                 if (!rc) {
2613                         struct osc_enqueue_args *aa;
2614                         aa = ptlrpc_req_async_args(aa, req);
2615                         aa->oa_exp         = exp;
2616                         aa->oa_mode        = einfo->ei_mode;
2617                         aa->oa_type        = einfo->ei_type;
2618                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2619                         aa->oa_upcall      = upcall;
2620                         aa->oa_cookie      = cookie;
2621                         aa->oa_speculative = speculative;
2622                         if (!speculative) {
2623                                 aa->oa_flags  = flags;
2624                                 aa->oa_lvb    = lvb;
2625                         } else {
2626                                 /* speculative locks are essentially to enqueue
2627                                  * a DLM lock  in advance, so we don't care
2628                                  * about the result of the enqueue. */
2629                                 aa->oa_lvb    = NULL;
2630                                 aa->oa_flags  = NULL;
2631                         }
2632
2633                         req->rq_interpret_reply = osc_enqueue_interpret;
2634                         if (rqset == PTLRPCD_SET)
2635                                 ptlrpcd_add_req(req);
2636                         else
2637                                 ptlrpc_set_add_req(rqset, req);
2638                 } else if (intent) {
2639                         ptlrpc_req_finished(req);
2640                 }
2641                 RETURN(rc);
2642         }
2643
2644         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2645                               flags, speculative, rc);
2646         if (intent)
2647                 ptlrpc_req_finished(req);
2648
2649         RETURN(rc);
2650 }
2651
2652 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2653                    enum ldlm_type type, union ldlm_policy_data *policy,
2654                    enum ldlm_mode mode, __u64 *flags, void *data,
2655                    struct lustre_handle *lockh, int unref)
2656 {
2657         struct obd_device *obd = exp->exp_obd;
2658         __u64 lflags = *flags;
2659         enum ldlm_mode rc;
2660         ENTRY;
2661
2662         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2663                 RETURN(-EIO);
2664
2665         /* Filesystem lock extents are extended to page boundaries so that
2666          * dealing with the page cache is a little smoother */
2667         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2668         policy->l_extent.end |= ~PAGE_MASK;
2669
2670         /* Next, search for already existing extent locks that will cover us */
2671         /* If we're trying to read, we also search for an existing PW lock.  The
2672          * VFS and page cache already protect us locally, so lots of readers/
2673          * writers can share a single PW lock. */
2674         rc = mode;
2675         if (mode == LCK_PR)
2676                 rc |= LCK_PW;
2677         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2678                              res_id, type, policy, rc, lockh, unref);
2679         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2680                 RETURN(rc);
2681
2682         if (data != NULL) {
2683                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2684
2685                 LASSERT(lock != NULL);
2686                 if (!osc_set_lock_data(lock, data)) {
2687                         ldlm_lock_decref(lockh, rc);
2688                         rc = 0;
2689                 }
2690                 LDLM_LOCK_PUT(lock);
2691         }
2692         RETURN(rc);
2693 }
2694
2695 static int osc_statfs_interpret(const struct lu_env *env,
2696                                 struct ptlrpc_request *req, void *args, int rc)
2697 {
2698         struct osc_async_args *aa = args;
2699         struct obd_statfs *msfs;
2700
2701         ENTRY;
2702         if (rc == -EBADR)
2703                 /*
2704                  * The request has in fact never been sent due to issues at
2705                  * a higher level (LOV).  Exit immediately since the caller
2706                  * is aware of the problem and takes care of the clean up.
2707                  */
2708                 RETURN(rc);
2709
2710         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2711             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2712                 GOTO(out, rc = 0);
2713
2714         if (rc != 0)
2715                 GOTO(out, rc);
2716
2717         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2718         if (msfs == NULL)
2719                 GOTO(out, rc = -EPROTO);
2720
2721         *aa->aa_oi->oi_osfs = *msfs;
2722 out:
2723         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2724
2725         RETURN(rc);
2726 }
2727
2728 static int osc_statfs_async(struct obd_export *exp,
2729                             struct obd_info *oinfo, time64_t max_age,
2730                             struct ptlrpc_request_set *rqset)
2731 {
2732         struct obd_device     *obd = class_exp2obd(exp);
2733         struct ptlrpc_request *req;
2734         struct osc_async_args *aa;
2735         int rc;
2736         ENTRY;
2737
2738         if (obd->obd_osfs_age >= max_age) {
2739                 CDEBUG(D_SUPER,
2740                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2741                        obd->obd_name, &obd->obd_osfs,
2742                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2743                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2744                 spin_lock(&obd->obd_osfs_lock);
2745                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2746                 spin_unlock(&obd->obd_osfs_lock);
2747                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2748                 if (oinfo->oi_cb_up)
2749                         oinfo->oi_cb_up(oinfo, 0);
2750
2751                 RETURN(0);
2752         }
2753
2754         /* We could possibly pass max_age in the request (as an absolute
2755          * timestamp or a "seconds.usec ago") so the target can avoid doing
2756          * extra calls into the filesystem if that isn't necessary (e.g.
2757          * during mount that would help a bit).  Having relative timestamps
2758          * is not so great if request processing is slow, while absolute
2759          * timestamps are not ideal because they need time synchronization. */
2760         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2761         if (req == NULL)
2762                 RETURN(-ENOMEM);
2763
2764         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2765         if (rc) {
2766                 ptlrpc_request_free(req);
2767                 RETURN(rc);
2768         }
2769         ptlrpc_request_set_replen(req);
2770         req->rq_request_portal = OST_CREATE_PORTAL;
2771         ptlrpc_at_set_req_timeout(req);
2772
2773         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2774                 /* procfs requests not want stat in wait for avoid deadlock */
2775                 req->rq_no_resend = 1;
2776                 req->rq_no_delay = 1;
2777         }
2778
2779         req->rq_interpret_reply = osc_statfs_interpret;
2780         aa = ptlrpc_req_async_args(aa, req);
2781         aa->aa_oi = oinfo;
2782
2783         ptlrpc_set_add_req(rqset, req);
2784         RETURN(0);
2785 }
2786
2787 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2788                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2789 {
2790         struct obd_device     *obd = class_exp2obd(exp);
2791         struct obd_statfs     *msfs;
2792         struct ptlrpc_request *req;
2793         struct obd_import     *imp = NULL;
2794         int rc;
2795         ENTRY;
2796
2797
2798         /*Since the request might also come from lprocfs, so we need
2799          *sync this with client_disconnect_export Bug15684*/
2800         down_read(&obd->u.cli.cl_sem);
2801         if (obd->u.cli.cl_import)
2802                 imp = class_import_get(obd->u.cli.cl_import);
2803         up_read(&obd->u.cli.cl_sem);
2804         if (!imp)
2805                 RETURN(-ENODEV);
2806
2807         /* We could possibly pass max_age in the request (as an absolute
2808          * timestamp or a "seconds.usec ago") so the target can avoid doing
2809          * extra calls into the filesystem if that isn't necessary (e.g.
2810          * during mount that would help a bit).  Having relative timestamps
2811          * is not so great if request processing is slow, while absolute
2812          * timestamps are not ideal because they need time synchronization. */
2813         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2814
2815         class_import_put(imp);
2816
2817         if (req == NULL)
2818                 RETURN(-ENOMEM);
2819
2820         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2821         if (rc) {
2822                 ptlrpc_request_free(req);
2823                 RETURN(rc);
2824         }
2825         ptlrpc_request_set_replen(req);
2826         req->rq_request_portal = OST_CREATE_PORTAL;
2827         ptlrpc_at_set_req_timeout(req);
2828
2829         if (flags & OBD_STATFS_NODELAY) {
2830                 /* procfs requests not want stat in wait for avoid deadlock */
2831                 req->rq_no_resend = 1;
2832                 req->rq_no_delay = 1;
2833         }
2834
2835         rc = ptlrpc_queue_wait(req);
2836         if (rc)
2837                 GOTO(out, rc);
2838
2839         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2840         if (msfs == NULL)
2841                 GOTO(out, rc = -EPROTO);
2842
2843         *osfs = *msfs;
2844
2845         EXIT;
2846 out:
2847         ptlrpc_req_finished(req);
2848         return rc;
2849 }
2850
2851 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2852                          void *karg, void __user *uarg)
2853 {
2854         struct obd_device *obd = exp->exp_obd;
2855         struct obd_ioctl_data *data = karg;
2856         int rc = 0;
2857
2858         ENTRY;
2859         if (!try_module_get(THIS_MODULE)) {
2860                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2861                        module_name(THIS_MODULE));
2862                 return -EINVAL;
2863         }
2864         switch (cmd) {
2865         case OBD_IOC_CLIENT_RECOVER:
2866                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2867                                            data->ioc_inlbuf1, 0);
2868                 if (rc > 0)
2869                         rc = 0;
2870                 break;
2871         case IOC_OSC_SET_ACTIVE:
2872                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2873                                               data->ioc_offset);
2874                 break;
2875         default:
2876                 rc = -ENOTTY;
2877                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2878                        obd->obd_name, cmd, current_comm(), rc);
2879                 break;
2880         }
2881
2882         module_put(THIS_MODULE);
2883         return rc;
2884 }
2885
2886 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2887                        u32 keylen, void *key, u32 vallen, void *val,
2888                        struct ptlrpc_request_set *set)
2889 {
2890         struct ptlrpc_request *req;
2891         struct obd_device     *obd = exp->exp_obd;
2892         struct obd_import     *imp = class_exp2cliimp(exp);
2893         char                  *tmp;
2894         int                    rc;
2895         ENTRY;
2896
2897         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2898
2899         if (KEY_IS(KEY_CHECKSUM)) {
2900                 if (vallen != sizeof(int))
2901                         RETURN(-EINVAL);
2902                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2903                 RETURN(0);
2904         }
2905
2906         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2907                 sptlrpc_conf_client_adapt(obd);
2908                 RETURN(0);
2909         }
2910
2911         if (KEY_IS(KEY_FLUSH_CTX)) {
2912                 sptlrpc_import_flush_my_ctx(imp);
2913                 RETURN(0);
2914         }
2915
2916         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2917                 struct client_obd *cli = &obd->u.cli;
2918                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2919                 long target = *(long *)val;
2920
2921                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2922                 *(long *)val -= nr;
2923                 RETURN(0);
2924         }
2925
2926         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2927                 RETURN(-EINVAL);
2928
2929         /* We pass all other commands directly to OST. Since nobody calls osc
2930            methods directly and everybody is supposed to go through LOV, we
2931            assume lov checked invalid values for us.
2932            The only recognised values so far are evict_by_nid and mds_conn.
2933            Even if something bad goes through, we'd get a -EINVAL from OST
2934            anyway. */
2935
2936         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2937                                                 &RQF_OST_SET_GRANT_INFO :
2938                                                 &RQF_OBD_SET_INFO);
2939         if (req == NULL)
2940                 RETURN(-ENOMEM);
2941
2942         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2943                              RCL_CLIENT, keylen);
2944         if (!KEY_IS(KEY_GRANT_SHRINK))
2945                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2946                                      RCL_CLIENT, vallen);
2947         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2948         if (rc) {
2949                 ptlrpc_request_free(req);
2950                 RETURN(rc);
2951         }
2952
2953         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2954         memcpy(tmp, key, keylen);
2955         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2956                                                         &RMF_OST_BODY :
2957                                                         &RMF_SETINFO_VAL);
2958         memcpy(tmp, val, vallen);
2959
2960         if (KEY_IS(KEY_GRANT_SHRINK)) {
2961                 struct osc_grant_args *aa;
2962                 struct obdo *oa;
2963
2964                 aa = ptlrpc_req_async_args(aa, req);
2965                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2966                 if (!oa) {
2967                         ptlrpc_req_finished(req);
2968                         RETURN(-ENOMEM);
2969                 }
2970                 *oa = ((struct ost_body *)val)->oa;
2971                 aa->aa_oa = oa;
2972                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2973         }
2974
2975         ptlrpc_request_set_replen(req);
2976         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2977                 LASSERT(set != NULL);
2978                 ptlrpc_set_add_req(set, req);
2979                 ptlrpc_check_set(NULL, set);
2980         } else {
2981                 ptlrpcd_add_req(req);
2982         }
2983
2984         RETURN(0);
2985 }
2986 EXPORT_SYMBOL(osc_set_info_async);
2987
2988 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2989                   struct obd_device *obd, struct obd_uuid *cluuid,
2990                   struct obd_connect_data *data, void *localdata)
2991 {
2992         struct client_obd *cli = &obd->u.cli;
2993
2994         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2995                 long lost_grant;
2996                 long grant;
2997
2998                 spin_lock(&cli->cl_loi_list_lock);
2999                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3000                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3001                         /* restore ocd_grant_blkbits as client page bits */
3002                         data->ocd_grant_blkbits = PAGE_SHIFT;
3003                         grant += cli->cl_dirty_grant;
3004                 } else {
3005                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3006                 }
3007                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3008                 lost_grant = cli->cl_lost_grant;
3009                 cli->cl_lost_grant = 0;
3010                 spin_unlock(&cli->cl_loi_list_lock);
3011
3012                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3013                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3014                        data->ocd_version, data->ocd_grant, lost_grant);
3015         }
3016
3017         RETURN(0);
3018 }
3019 EXPORT_SYMBOL(osc_reconnect);
3020
3021 int osc_disconnect(struct obd_export *exp)
3022 {
3023         struct obd_device *obd = class_exp2obd(exp);
3024         int rc;
3025
3026         rc = client_disconnect_export(exp);
3027         /**
3028          * Initially we put del_shrink_grant before disconnect_export, but it
3029          * causes the following problem if setup (connect) and cleanup
3030          * (disconnect) are tangled together.
3031          *      connect p1                     disconnect p2
3032          *   ptlrpc_connect_import
3033          *     ...............               class_manual_cleanup
3034          *                                     osc_disconnect
3035          *                                     del_shrink_grant
3036          *   ptlrpc_connect_interrupt
3037          *     osc_init_grant
3038          *   add this client to shrink list
3039          *                                      cleanup_osc
3040          * Bang! grant shrink thread trigger the shrink. BUG18662
3041          */
3042         osc_del_grant_list(&obd->u.cli);
3043         return rc;
3044 }
3045 EXPORT_SYMBOL(osc_disconnect);
3046
3047 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3048                                  struct hlist_node *hnode, void *arg)
3049 {
3050         struct lu_env *env = arg;
3051         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3052         struct ldlm_lock *lock;
3053         struct osc_object *osc = NULL;
3054         ENTRY;
3055
3056         lock_res(res);
3057         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3058                 if (lock->l_ast_data != NULL && osc == NULL) {
3059                         osc = lock->l_ast_data;
3060                         cl_object_get(osc2cl(osc));
3061                 }
3062
3063                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3064                  * by the 2nd round of ldlm_namespace_clean() call in
3065                  * osc_import_event(). */
3066                 ldlm_clear_cleaned(lock);
3067         }
3068         unlock_res(res);
3069
3070         if (osc != NULL) {
3071                 osc_object_invalidate(env, osc);
3072                 cl_object_put(env, osc2cl(osc));
3073         }
3074
3075         RETURN(0);
3076 }
3077 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3078
3079 static int osc_import_event(struct obd_device *obd,
3080                             struct obd_import *imp,
3081                             enum obd_import_event event)
3082 {
3083         struct client_obd *cli;
3084         int rc = 0;
3085
3086         ENTRY;
3087         LASSERT(imp->imp_obd == obd);
3088
3089         switch (event) {
3090         case IMP_EVENT_DISCON: {
3091                 cli = &obd->u.cli;
3092                 spin_lock(&cli->cl_loi_list_lock);
3093                 cli->cl_avail_grant = 0;
3094                 cli->cl_lost_grant = 0;
3095                 spin_unlock(&cli->cl_loi_list_lock);
3096                 break;
3097         }
3098         case IMP_EVENT_INACTIVE: {
3099                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3100                 break;
3101         }
3102         case IMP_EVENT_INVALIDATE: {
3103                 struct ldlm_namespace *ns = obd->obd_namespace;
3104                 struct lu_env         *env;
3105                 __u16                  refcheck;
3106
3107                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3108
3109                 env = cl_env_get(&refcheck);
3110                 if (!IS_ERR(env)) {
3111                         osc_io_unplug(env, &obd->u.cli, NULL);
3112
3113                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3114                                                  osc_ldlm_resource_invalidate,
3115                                                  env, 0);
3116                         cl_env_put(env, &refcheck);
3117
3118                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3119                 } else
3120                         rc = PTR_ERR(env);
3121                 break;
3122         }
3123         case IMP_EVENT_ACTIVE: {
3124                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3125                 break;
3126         }
3127         case IMP_EVENT_OCD: {
3128                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3129
3130                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3131                         osc_init_grant(&obd->u.cli, ocd);
3132
3133                 /* See bug 7198 */
3134                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3135                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3136
3137                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3138                 break;
3139         }
3140         case IMP_EVENT_DEACTIVATE: {
3141                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3142                 break;
3143         }
3144         case IMP_EVENT_ACTIVATE: {
3145                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3146                 break;
3147         }
3148         default:
3149                 CERROR("Unknown import event %d\n", event);
3150                 LBUG();
3151         }
3152         RETURN(rc);
3153 }
3154
3155 /**
3156  * Determine whether the lock can be canceled before replaying the lock
3157  * during recovery, see bug16774 for detailed information.
3158  *
3159  * \retval zero the lock can't be canceled
3160  * \retval other ok to cancel
3161  */
3162 static int osc_cancel_weight(struct ldlm_lock *lock)
3163 {
3164         /*
3165          * Cancel all unused and granted extent lock.
3166          */
3167         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3168             ldlm_is_granted(lock) &&
3169             osc_ldlm_weigh_ast(lock) == 0)
3170                 RETURN(1);
3171
3172         RETURN(0);
3173 }
3174
3175 static int brw_queue_work(const struct lu_env *env, void *data)
3176 {
3177         struct client_obd *cli = data;
3178
3179         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3180
3181         osc_io_unplug(env, cli, NULL);
3182         RETURN(0);
3183 }
3184
3185 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3186 {
3187         struct client_obd *cli = &obd->u.cli;
3188         void *handler;
3189         int rc;
3190
3191         ENTRY;
3192
3193         rc = ptlrpcd_addref();
3194         if (rc)
3195                 RETURN(rc);
3196
3197         rc = client_obd_setup(obd, lcfg);
3198         if (rc)
3199                 GOTO(out_ptlrpcd, rc);
3200
3201
3202         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3203         if (IS_ERR(handler))
3204                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205         cli->cl_writeback_work = handler;
3206
3207         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3208         if (IS_ERR(handler))
3209                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3210         cli->cl_lru_work = handler;
3211
3212         rc = osc_quota_setup(obd);
3213         if (rc)
3214                 GOTO(out_ptlrpcd_work, rc);
3215
3216         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3217         osc_update_next_shrink(cli);
3218
3219         RETURN(rc);
3220
3221 out_ptlrpcd_work:
3222         if (cli->cl_writeback_work != NULL) {
3223                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3224                 cli->cl_writeback_work = NULL;
3225         }
3226         if (cli->cl_lru_work != NULL) {
3227                 ptlrpcd_destroy_work(cli->cl_lru_work);
3228                 cli->cl_lru_work = NULL;
3229         }
3230         client_obd_cleanup(obd);
3231 out_ptlrpcd:
3232         ptlrpcd_decref();
3233         RETURN(rc);
3234 }
3235 EXPORT_SYMBOL(osc_setup_common);
3236
3237 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3238 {
3239         struct client_obd *cli = &obd->u.cli;
3240         int                adding;
3241         int                added;
3242         int                req_count;
3243         int                rc;
3244
3245         ENTRY;
3246
3247         rc = osc_setup_common(obd, lcfg);
3248         if (rc < 0)
3249                 RETURN(rc);
3250
3251         rc = osc_tunables_init(obd);
3252         if (rc)
3253                 RETURN(rc);
3254
3255         /*
3256          * We try to control the total number of requests with a upper limit
3257          * osc_reqpool_maxreqcount. There might be some race which will cause
3258          * over-limit allocation, but it is fine.
3259          */
3260         req_count = atomic_read(&osc_pool_req_count);
3261         if (req_count < osc_reqpool_maxreqcount) {
3262                 adding = cli->cl_max_rpcs_in_flight + 2;
3263                 if (req_count + adding > osc_reqpool_maxreqcount)
3264                         adding = osc_reqpool_maxreqcount - req_count;
3265
3266                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3267                 atomic_add(added, &osc_pool_req_count);
3268         }
3269
3270         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3271
3272         spin_lock(&osc_shrink_lock);
3273         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3274         spin_unlock(&osc_shrink_lock);
3275         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3276         cli->cl_import->imp_idle_debug = D_HA;
3277
3278         RETURN(0);
3279 }
3280
3281 int osc_precleanup_common(struct obd_device *obd)
3282 {
3283         struct client_obd *cli = &obd->u.cli;
3284         ENTRY;
3285
3286         /* LU-464
3287          * for echo client, export may be on zombie list, wait for
3288          * zombie thread to cull it, because cli.cl_import will be
3289          * cleared in client_disconnect_export():
3290          *   class_export_destroy() -> obd_cleanup() ->
3291          *   echo_device_free() -> echo_client_cleanup() ->
3292          *   obd_disconnect() -> osc_disconnect() ->
3293          *   client_disconnect_export()
3294          */
3295         obd_zombie_barrier();
3296         if (cli->cl_writeback_work) {
3297                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3298                 cli->cl_writeback_work = NULL;
3299         }
3300
3301         if (cli->cl_lru_work) {
3302                 ptlrpcd_destroy_work(cli->cl_lru_work);
3303                 cli->cl_lru_work = NULL;
3304         }
3305
3306         obd_cleanup_client_import(obd);
3307         RETURN(0);
3308 }
3309 EXPORT_SYMBOL(osc_precleanup_common);
3310
3311 static int osc_precleanup(struct obd_device *obd)
3312 {
3313         ENTRY;
3314
3315         osc_precleanup_common(obd);
3316
3317         ptlrpc_lprocfs_unregister_obd(obd);
3318         RETURN(0);
3319 }
3320
3321 int osc_cleanup_common(struct obd_device *obd)
3322 {
3323         struct client_obd *cli = &obd->u.cli;
3324         int rc;
3325
3326         ENTRY;
3327
3328         spin_lock(&osc_shrink_lock);
3329         list_del(&cli->cl_shrink_list);
3330         spin_unlock(&osc_shrink_lock);
3331
3332         /* lru cleanup */
3333         if (cli->cl_cache != NULL) {
3334                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3335                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3336                 list_del_init(&cli->cl_lru_osc);
3337                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3338                 cli->cl_lru_left = NULL;
3339                 cl_cache_decref(cli->cl_cache);
3340                 cli->cl_cache = NULL;
3341         }
3342
3343         /* free memory of osc quota cache */
3344         osc_quota_cleanup(obd);
3345
3346         rc = client_obd_cleanup(obd);
3347
3348         ptlrpcd_decref();
3349         RETURN(rc);
3350 }
3351 EXPORT_SYMBOL(osc_cleanup_common);
3352
3353 static struct obd_ops osc_obd_ops = {
3354         .o_owner                = THIS_MODULE,
3355         .o_setup                = osc_setup,
3356         .o_precleanup           = osc_precleanup,
3357         .o_cleanup              = osc_cleanup_common,
3358         .o_add_conn             = client_import_add_conn,
3359         .o_del_conn             = client_import_del_conn,
3360         .o_connect              = client_connect_import,
3361         .o_reconnect            = osc_reconnect,
3362         .o_disconnect           = osc_disconnect,
3363         .o_statfs               = osc_statfs,
3364         .o_statfs_async         = osc_statfs_async,
3365         .o_create               = osc_create,
3366         .o_destroy              = osc_destroy,
3367         .o_getattr              = osc_getattr,
3368         .o_setattr              = osc_setattr,
3369         .o_iocontrol            = osc_iocontrol,
3370         .o_set_info_async       = osc_set_info_async,
3371         .o_import_event         = osc_import_event,
3372         .o_quotactl             = osc_quotactl,
3373 };
3374
3375 static struct shrinker *osc_cache_shrinker;
3376 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3377 DEFINE_SPINLOCK(osc_shrink_lock);
3378
3379 #ifndef HAVE_SHRINKER_COUNT
3380 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3381 {
3382         struct shrink_control scv = {
3383                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3384                 .gfp_mask   = shrink_param(sc, gfp_mask)
3385         };
3386 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3387         struct shrinker *shrinker = NULL;
3388 #endif
3389
3390         (void)osc_cache_shrink_scan(shrinker, &scv);
3391
3392         return osc_cache_shrink_count(shrinker, &scv);
3393 }
3394 #endif
3395
3396 static int __init osc_init(void)
3397 {
3398         unsigned int reqpool_size;
3399         unsigned int reqsize;
3400         int rc;
3401         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3402                          osc_cache_shrink_count, osc_cache_shrink_scan);
3403         ENTRY;
3404
3405         /* print an address of _any_ initialized kernel symbol from this
3406          * module, to allow debugging with gdb that doesn't support data
3407          * symbols from modules.*/
3408         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3409
3410         rc = lu_kmem_init(osc_caches);
3411         if (rc)
3412                 RETURN(rc);
3413
3414         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3415                                  LUSTRE_OSC_NAME, &osc_device_type);
3416         if (rc)
3417                 GOTO(out_kmem, rc);
3418
3419         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3420
3421         /* This is obviously too much memory, only prevent overflow here */
3422         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3423                 GOTO(out_type, rc = -EINVAL);
3424
3425         reqpool_size = osc_reqpool_mem_max << 20;
3426
3427         reqsize = 1;
3428         while (reqsize < OST_IO_MAXREQSIZE)
3429                 reqsize = reqsize << 1;
3430
3431         /*
3432          * We don't enlarge the request count in OSC pool according to
3433          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3434          * tried after normal allocation failed. So a small OSC pool won't
3435          * cause much performance degression in most of cases.
3436          */
3437         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3438
3439         atomic_set(&osc_pool_req_count, 0);
3440         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3441                                           ptlrpc_add_rqs_to_pool);
3442
3443         if (osc_rq_pool == NULL)
3444                 GOTO(out_type, rc = -ENOMEM);
3445
3446         rc = osc_start_grant_work();
3447         if (rc != 0)
3448                 GOTO(out_req_pool, rc);
3449
3450         RETURN(rc);
3451
3452 out_req_pool:
3453         ptlrpc_free_rq_pool(osc_rq_pool);
3454 out_type:
3455         class_unregister_type(LUSTRE_OSC_NAME);
3456 out_kmem:
3457         lu_kmem_fini(osc_caches);
3458
3459         RETURN(rc);
3460 }
3461
3462 static void __exit osc_exit(void)
3463 {
3464         osc_stop_grant_work();
3465         remove_shrinker(osc_cache_shrinker);
3466         class_unregister_type(LUSTRE_OSC_NAME);
3467         lu_kmem_fini(osc_caches);
3468         ptlrpc_free_rq_pool(osc_rq_pool);
3469 }
3470
3471 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3472 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3473 MODULE_VERSION(LUSTRE_VERSION_STRING);
3474 MODULE_LICENSE("GPL");
3475
3476 module_init(osc_init);
3477 module_exit(osc_exit);