Whamcloud - gitweb
0ea2ad6eec4223127abd916f29f82747e66cec80
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
61
62 #define osc_grant_args osc_brw_async_args
63
64 struct osc_setattr_args {
65         struct obdo             *sa_oa;
66         obd_enqueue_update_f     sa_upcall;
67         void                    *sa_cookie;
68 };
69
70 struct osc_fsync_args {
71         struct osc_object       *fa_obj;
72         struct obdo             *fa_oa;
73         obd_enqueue_update_f    fa_upcall;
74         void                    *fa_cookie;
75 };
76
77 struct osc_ladvise_args {
78         struct obdo             *la_oa;
79         obd_enqueue_update_f     la_upcall;
80         void                    *la_cookie;
81 };
82
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
85                          void *data, int rc);
86
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
88 {
89         struct ost_body *body;
90
91         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
92         LASSERT(body);
93
94         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
95 }
96
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
98                        struct obdo *oa)
99 {
100         struct ptlrpc_request   *req;
101         struct ost_body         *body;
102         int                      rc;
103
104         ENTRY;
105         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
106         if (req == NULL)
107                 RETURN(-ENOMEM);
108
109         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
110         if (rc) {
111                 ptlrpc_request_free(req);
112                 RETURN(rc);
113         }
114
115         osc_pack_req_body(req, oa);
116
117         ptlrpc_request_set_replen(req);
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
124         if (body == NULL)
125                 GOTO(out, rc = -EPROTO);
126
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
129
130         oa->o_blksize = cli_brw_size(exp->exp_obd);
131         oa->o_valid |= OBD_MD_FLBLKSZ;
132
133         EXIT;
134 out:
135         ptlrpc_req_finished(req);
136
137         return rc;
138 }
139
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
141                        struct obdo *oa)
142 {
143         struct ptlrpc_request   *req;
144         struct ost_body         *body;
145         int                      rc;
146
147         ENTRY;
148         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
149
150         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
155         if (rc) {
156                 ptlrpc_request_free(req);
157                 RETURN(rc);
158         }
159
160         osc_pack_req_body(req, oa);
161
162         ptlrpc_request_set_replen(req);
163
164         rc = ptlrpc_queue_wait(req);
165         if (rc)
166                 GOTO(out, rc);
167
168         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
169         if (body == NULL)
170                 GOTO(out, rc = -EPROTO);
171
172         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
173
174         EXIT;
175 out:
176         ptlrpc_req_finished(req);
177
178         RETURN(rc);
179 }
180
181 static int osc_setattr_interpret(const struct lu_env *env,
182                                  struct ptlrpc_request *req, void *args, int rc)
183 {
184         struct osc_setattr_args *sa = args;
185         struct ost_body *body;
186
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply = osc_setattr_interpret;
233
234                 sa = ptlrpc_req_async_args(sa, req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 ptlrpc_set_add_req(rqset, req);
240         }
241
242         RETURN(0);
243 }
244
245 static int osc_ladvise_interpret(const struct lu_env *env,
246                                  struct ptlrpc_request *req,
247                                  void *arg, int rc)
248 {
249         struct osc_ladvise_args *la = arg;
250         struct ost_body *body;
251         ENTRY;
252
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
257         if (body == NULL)
258                 GOTO(out, rc = -EPROTO);
259
260         *la->la_oa = body->oa;
261 out:
262         rc = la->la_upcall(la->la_cookie, rc);
263         RETURN(rc);
264 }
265
266 /**
267  * If rqset is NULL, do not wait for response. Upcall and cookie could also
268  * be NULL in this case
269  */
270 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
271                      struct ladvise_hdr *ladvise_hdr,
272                      obd_enqueue_update_f upcall, void *cookie,
273                      struct ptlrpc_request_set *rqset)
274 {
275         struct ptlrpc_request   *req;
276         struct ost_body         *body;
277         struct osc_ladvise_args *la;
278         int                      rc;
279         struct lu_ladvise       *req_ladvise;
280         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
281         int                      num_advise = ladvise_hdr->lah_count;
282         struct ladvise_hdr      *req_ladvise_hdr;
283         ENTRY;
284
285         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
286         if (req == NULL)
287                 RETURN(-ENOMEM);
288
289         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
290                              num_advise * sizeof(*ladvise));
291         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
292         if (rc != 0) {
293                 ptlrpc_request_free(req);
294                 RETURN(rc);
295         }
296         req->rq_request_portal = OST_IO_PORTAL;
297         ptlrpc_at_set_req_timeout(req);
298
299         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
300         LASSERT(body);
301         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
302                              oa);
303
304         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
305                                                  &RMF_OST_LADVISE_HDR);
306         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
307
308         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
309         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
310         ptlrpc_request_set_replen(req);
311
312         if (rqset == NULL) {
313                 /* Do not wait for response. */
314                 ptlrpcd_add_req(req);
315                 RETURN(0);
316         }
317
318         req->rq_interpret_reply = osc_ladvise_interpret;
319         la = ptlrpc_req_async_args(la, req);
320         la->la_oa = oa;
321         la->la_upcall = upcall;
322         la->la_cookie = cookie;
323
324         ptlrpc_set_add_req(rqset, req);
325
326         RETURN(0);
327 }
328
329 static int osc_create(const struct lu_env *env, struct obd_export *exp,
330                       struct obdo *oa)
331 {
332         struct ptlrpc_request *req;
333         struct ost_body       *body;
334         int                    rc;
335         ENTRY;
336
337         LASSERT(oa != NULL);
338         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
339         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
340
341         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
342         if (req == NULL)
343                 GOTO(out, rc = -ENOMEM);
344
345         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
346         if (rc) {
347                 ptlrpc_request_free(req);
348                 GOTO(out, rc);
349         }
350
351         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
352         LASSERT(body);
353
354         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
355
356         ptlrpc_request_set_replen(req);
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         CDEBUG(D_HA, "transno: %lld\n",
373                lustre_msg_get_transno(req->rq_repmsg));
374 out_req:
375         ptlrpc_req_finished(req);
376 out:
377         RETURN(rc);
378 }
379
380 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
381                    obd_enqueue_update_f upcall, void *cookie)
382 {
383         struct ptlrpc_request *req;
384         struct osc_setattr_args *sa;
385         struct obd_import *imp = class_exp2cliimp(exp);
386         struct ost_body *body;
387         int rc;
388
389         ENTRY;
390
391         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
396         if (rc < 0) {
397                 ptlrpc_request_free(req);
398                 RETURN(rc);
399         }
400
401         osc_set_io_portal(req);
402
403         ptlrpc_at_set_req_timeout(req);
404
405         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
406
407         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
408
409         ptlrpc_request_set_replen(req);
410
411         req->rq_interpret_reply = osc_setattr_interpret;
412         sa = ptlrpc_req_async_args(sa, req);
413         sa->sa_oa = oa;
414         sa->sa_upcall = upcall;
415         sa->sa_cookie = cookie;
416
417         ptlrpcd_add_req(req);
418
419         RETURN(0);
420 }
421 EXPORT_SYMBOL(osc_punch_send);
422
423 static int osc_sync_interpret(const struct lu_env *env,
424                               struct ptlrpc_request *req, void *args, int rc)
425 {
426         struct osc_fsync_args *fa = args;
427         struct ost_body *body;
428         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
429         unsigned long valid = 0;
430         struct cl_object *obj;
431         ENTRY;
432
433         if (rc != 0)
434                 GOTO(out, rc);
435
436         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
437         if (body == NULL) {
438                 CERROR("can't unpack ost_body\n");
439                 GOTO(out, rc = -EPROTO);
440         }
441
442         *fa->fa_oa = body->oa;
443         obj = osc2cl(fa->fa_obj);
444
445         /* Update osc object's blocks attribute */
446         cl_object_attr_lock(obj);
447         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
448                 attr->cat_blocks = body->oa.o_blocks;
449                 valid |= CAT_BLOCKS;
450         }
451
452         if (valid != 0)
453                 cl_object_attr_update(env, obj, attr, valid);
454         cl_object_attr_unlock(obj);
455
456 out:
457         rc = fa->fa_upcall(fa->fa_cookie, rc);
458         RETURN(rc);
459 }
460
461 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
462                   obd_enqueue_update_f upcall, void *cookie,
463                   struct ptlrpc_request_set *rqset)
464 {
465         struct obd_export     *exp = osc_export(obj);
466         struct ptlrpc_request *req;
467         struct ost_body       *body;
468         struct osc_fsync_args *fa;
469         int                    rc;
470         ENTRY;
471
472         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
473         if (req == NULL)
474                 RETURN(-ENOMEM);
475
476         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
477         if (rc) {
478                 ptlrpc_request_free(req);
479                 RETURN(rc);
480         }
481
482         /* overload the size and blocks fields in the oa with start/end */
483         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
484         LASSERT(body);
485         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
486
487         ptlrpc_request_set_replen(req);
488         req->rq_interpret_reply = osc_sync_interpret;
489
490         fa = ptlrpc_req_async_args(fa, req);
491         fa->fa_obj = obj;
492         fa->fa_oa = oa;
493         fa->fa_upcall = upcall;
494         fa->fa_cookie = cookie;
495
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN (0);
499 }
500
501 /* Find and cancel locally locks matched by @mode in the resource found by
502  * @objid. Found locks are added into @cancel list. Returns the amount of
503  * locks added to @cancels list. */
504 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
505                                    struct list_head *cancels,
506                                    enum ldlm_mode mode, __u64 lock_flags)
507 {
508         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
509         struct ldlm_res_id res_id;
510         struct ldlm_resource *res;
511         int count;
512         ENTRY;
513
514         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
515          * export) but disabled through procfs (flag in NS).
516          *
517          * This distinguishes from a case when ELC is not supported originally,
518          * when we still want to cancel locks in advance and just cancel them
519          * locally, without sending any RPC. */
520         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
521                 RETURN(0);
522
523         ostid_build_res_name(&oa->o_oi, &res_id);
524         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
525         if (IS_ERR(res))
526                 RETURN(0);
527
528         LDLM_RESOURCE_ADDREF(res);
529         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
530                                            lock_flags, 0, NULL);
531         LDLM_RESOURCE_DELREF(res);
532         ldlm_resource_putref(res);
533         RETURN(count);
534 }
535
536 static int osc_destroy_interpret(const struct lu_env *env,
537                                  struct ptlrpc_request *req, void *args, int rc)
538 {
539         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
540
541         atomic_dec(&cli->cl_destroy_in_flight);
542         wake_up(&cli->cl_destroy_waitq);
543
544         return 0;
545 }
546
547 static int osc_can_send_destroy(struct client_obd *cli)
548 {
549         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
550             cli->cl_max_rpcs_in_flight) {
551                 /* The destroy request can be sent */
552                 return 1;
553         }
554         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
555             cli->cl_max_rpcs_in_flight) {
556                 /*
557                  * The counter has been modified between the two atomic
558                  * operations.
559                  */
560                 wake_up(&cli->cl_destroy_waitq);
561         }
562         return 0;
563 }
564
565 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
566                        struct obdo *oa)
567 {
568         struct client_obd     *cli = &exp->exp_obd->u.cli;
569         struct ptlrpc_request *req;
570         struct ost_body       *body;
571         LIST_HEAD(cancels);
572         int rc, count;
573         ENTRY;
574
575         if (!oa) {
576                 CDEBUG(D_INFO, "oa NULL\n");
577                 RETURN(-EINVAL);
578         }
579
580         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
581                                         LDLM_FL_DISCARD_DATA);
582
583         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
584         if (req == NULL) {
585                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
586                 RETURN(-ENOMEM);
587         }
588
589         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
590                                0, &cancels, count);
591         if (rc) {
592                 ptlrpc_request_free(req);
593                 RETURN(rc);
594         }
595
596         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
597         ptlrpc_at_set_req_timeout(req);
598
599         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
600         LASSERT(body);
601         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
602
603         ptlrpc_request_set_replen(req);
604
605         req->rq_interpret_reply = osc_destroy_interpret;
606         if (!osc_can_send_destroy(cli)) {
607                 /*
608                  * Wait until the number of on-going destroy RPCs drops
609                  * under max_rpc_in_flight
610                  */
611                 rc = l_wait_event_abortable_exclusive(
612                         cli->cl_destroy_waitq,
613                         osc_can_send_destroy(cli));
614                 if (rc) {
615                         ptlrpc_req_finished(req);
616                         RETURN(-EINTR);
617                 }
618         }
619
620         /* Do not wait for response */
621         ptlrpcd_add_req(req);
622         RETURN(0);
623 }
624
625 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
626                                 long writing_bytes)
627 {
628         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
629
630         LASSERT(!(oa->o_valid & bits));
631
632         oa->o_valid |= bits;
633         spin_lock(&cli->cl_loi_list_lock);
634         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
635                 oa->o_dirty = cli->cl_dirty_grant;
636         else
637                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
638         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty_pages,
641                        cli->cl_dirty_max_pages);
642                 oa->o_undirty = 0;
643         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
644                             (long)(obd_max_dirty_pages + 1))) {
645                 /* The atomic_read() allowing the atomic_inc() are
646                  * not covered by a lock thus they may safely race and trip
647                  * this CERROR() unless we add in a small fudge factor (+1). */
648                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
649                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
650                        obd_max_dirty_pages);
651                 oa->o_undirty = 0;
652         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
653                             0x7fffffff)) {
654                 CERROR("dirty %lu - dirty_max %lu too big???\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
656                 oa->o_undirty = 0;
657         } else {
658                 unsigned long nrpages;
659                 unsigned long undirty;
660
661                 nrpages = cli->cl_max_pages_per_rpc;
662                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
663                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
664                 undirty = nrpages << PAGE_SHIFT;
665                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
666                                  GRANT_PARAM)) {
667                         int nrextents;
668
669                         /* take extent tax into account when asking for more
670                          * grant space */
671                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
672                                      cli->cl_max_extent_pages;
673                         undirty += nrextents * cli->cl_grant_extent_tax;
674                 }
675                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
676                  * to add extent tax, etc.
677                  */
678                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
679                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
680         }
681         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         spin_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 void osc_update_next_shrink(struct client_obd *cli)
690 {
691         cli->cl_next_shrink_grant = ktime_get_seconds() +
692                                     cli->cl_grant_shrink_interval;
693
694         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
695                cli->cl_next_shrink_grant);
696 }
697
698 static void __osc_update_grant(struct client_obd *cli, u64 grant)
699 {
700         spin_lock(&cli->cl_loi_list_lock);
701         cli->cl_avail_grant += grant;
702         spin_unlock(&cli->cl_loi_list_lock);
703 }
704
705 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
706 {
707         if (body->oa.o_valid & OBD_MD_FLGRANT) {
708                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
709                 __osc_update_grant(cli, body->oa.o_grant);
710         }
711 }
712
713 /**
714  * grant thread data for shrinking space.
715  */
716 struct grant_thread_data {
717         struct list_head        gtd_clients;
718         struct mutex            gtd_mutex;
719         unsigned long           gtd_stopped:1;
720 };
721 static struct grant_thread_data client_gtd;
722
723 static int osc_shrink_grant_interpret(const struct lu_env *env,
724                                       struct ptlrpc_request *req,
725                                       void *args, int rc)
726 {
727         struct osc_grant_args *aa = args;
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct ost_body *body;
730
731         if (rc != 0) {
732                 __osc_update_grant(cli, aa->aa_oa->o_grant);
733                 GOTO(out, rc);
734         }
735
736         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
737         LASSERT(body);
738         osc_update_grant(cli, body);
739 out:
740         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
741         aa->aa_oa = NULL;
742
743         return rc;
744 }
745
746 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
747 {
748         spin_lock(&cli->cl_loi_list_lock);
749         oa->o_grant = cli->cl_avail_grant / 4;
750         cli->cl_avail_grant -= oa->o_grant;
751         spin_unlock(&cli->cl_loi_list_lock);
752         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
753                 oa->o_valid |= OBD_MD_FLFLAGS;
754                 oa->o_flags = 0;
755         }
756         oa->o_flags |= OBD_FL_SHRINK_GRANT;
757         osc_update_next_shrink(cli);
758 }
759
760 /* Shrink the current grant, either from some large amount to enough for a
761  * full set of in-flight RPCs, or if we have already shrunk to that limit
762  * then to enough for a single RPC.  This avoids keeping more grant than
763  * needed, and avoids shrinking the grant piecemeal. */
764 static int osc_shrink_grant(struct client_obd *cli)
765 {
766         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
767                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
768
769         spin_lock(&cli->cl_loi_list_lock);
770         if (cli->cl_avail_grant <= target_bytes)
771                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
772         spin_unlock(&cli->cl_loi_list_lock);
773
774         return osc_shrink_grant_to_target(cli, target_bytes);
775 }
776
777 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
778 {
779         int                     rc = 0;
780         struct ost_body        *body;
781         ENTRY;
782
783         spin_lock(&cli->cl_loi_list_lock);
784         /* Don't shrink if we are already above or below the desired limit
785          * We don't want to shrink below a single RPC, as that will negatively
786          * impact block allocation and long-term performance. */
787         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
788                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
789
790         if (target_bytes >= cli->cl_avail_grant) {
791                 spin_unlock(&cli->cl_loi_list_lock);
792                 RETURN(0);
793         }
794         spin_unlock(&cli->cl_loi_list_lock);
795
796         OBD_ALLOC_PTR(body);
797         if (!body)
798                 RETURN(-ENOMEM);
799
800         osc_announce_cached(cli, &body->oa, 0);
801
802         spin_lock(&cli->cl_loi_list_lock);
803         if (target_bytes >= cli->cl_avail_grant) {
804                 /* available grant has changed since target calculation */
805                 spin_unlock(&cli->cl_loi_list_lock);
806                 GOTO(out_free, rc = 0);
807         }
808         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
809         cli->cl_avail_grant = target_bytes;
810         spin_unlock(&cli->cl_loi_list_lock);
811         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
812                 body->oa.o_valid |= OBD_MD_FLFLAGS;
813                 body->oa.o_flags = 0;
814         }
815         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
816         osc_update_next_shrink(cli);
817
818         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
819                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
820                                 sizeof(*body), body, NULL);
821         if (rc != 0)
822                 __osc_update_grant(cli, body->oa.o_grant);
823 out_free:
824         OBD_FREE_PTR(body);
825         RETURN(rc);
826 }
827
828 static int osc_should_shrink_grant(struct client_obd *client)
829 {
830         time64_t next_shrink = client->cl_next_shrink_grant;
831
832         if (client->cl_import == NULL)
833                 return 0;
834
835         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
836             client->cl_import->imp_grant_shrink_disabled) {
837                 osc_update_next_shrink(client);
838                 return 0;
839         }
840
841         if (ktime_get_seconds() >= next_shrink - 5) {
842                 /* Get the current RPC size directly, instead of going via:
843                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844                  * Keep comment here so that it can be found by searching. */
845                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
846
847                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848                     client->cl_avail_grant > brw_size)
849                         return 1;
850                 else
851                         osc_update_next_shrink(client);
852         }
853         return 0;
854 }
855
856 #define GRANT_SHRINK_RPC_BATCH  100
857
858 static struct delayed_work work;
859
860 static void osc_grant_work_handler(struct work_struct *data)
861 {
862         struct client_obd *cli;
863         int rpc_sent;
864         bool init_next_shrink = true;
865         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
866
867         rpc_sent = 0;
868         mutex_lock(&client_gtd.gtd_mutex);
869         list_for_each_entry(cli, &client_gtd.gtd_clients,
870                             cl_grant_chain) {
871                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
872                     osc_should_shrink_grant(cli)) {
873                         osc_shrink_grant(cli);
874                         rpc_sent++;
875                 }
876
877                 if (!init_next_shrink) {
878                         if (cli->cl_next_shrink_grant < next_shrink &&
879                             cli->cl_next_shrink_grant > ktime_get_seconds())
880                                 next_shrink = cli->cl_next_shrink_grant;
881                 } else {
882                         init_next_shrink = false;
883                         next_shrink = cli->cl_next_shrink_grant;
884                 }
885         }
886         mutex_unlock(&client_gtd.gtd_mutex);
887
888         if (client_gtd.gtd_stopped == 1)
889                 return;
890
891         if (next_shrink > ktime_get_seconds()) {
892                 time64_t delay = next_shrink - ktime_get_seconds();
893
894                 schedule_delayed_work(&work, cfs_time_seconds(delay));
895         } else {
896                 schedule_work(&work.work);
897         }
898 }
899
900 void osc_schedule_grant_work(void)
901 {
902         cancel_delayed_work_sync(&work);
903         schedule_work(&work.work);
904 }
905
906 /**
907  * Start grant thread for returing grant to server for idle clients.
908  */
909 static int osc_start_grant_work(void)
910 {
911         client_gtd.gtd_stopped = 0;
912         mutex_init(&client_gtd.gtd_mutex);
913         INIT_LIST_HEAD(&client_gtd.gtd_clients);
914
915         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
916         schedule_work(&work.work);
917
918         return 0;
919 }
920
921 static void osc_stop_grant_work(void)
922 {
923         client_gtd.gtd_stopped = 1;
924         cancel_delayed_work_sync(&work);
925 }
926
927 static void osc_add_grant_list(struct client_obd *client)
928 {
929         mutex_lock(&client_gtd.gtd_mutex);
930         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
931         mutex_unlock(&client_gtd.gtd_mutex);
932 }
933
934 static void osc_del_grant_list(struct client_obd *client)
935 {
936         if (list_empty(&client->cl_grant_chain))
937                 return;
938
939         mutex_lock(&client_gtd.gtd_mutex);
940         list_del_init(&client->cl_grant_chain);
941         mutex_unlock(&client_gtd.gtd_mutex);
942 }
943
944 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
945 {
946         /*
947          * ocd_grant is the total grant amount we're expect to hold: if we've
948          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
949          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
950          * dirty.
951          *
952          * race is tolerable here: if we're evicted, but imp_state already
953          * left EVICTED state, then cl_dirty_pages must be 0 already.
954          */
955         spin_lock(&cli->cl_loi_list_lock);
956         cli->cl_avail_grant = ocd->ocd_grant;
957         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
958                 cli->cl_avail_grant -= cli->cl_reserved_grant;
959                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
960                         cli->cl_avail_grant -= cli->cl_dirty_grant;
961                 else
962                         cli->cl_avail_grant -=
963                                         cli->cl_dirty_pages << PAGE_SHIFT;
964         }
965
966         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
967                 u64 size;
968                 int chunk_mask;
969
970                 /* overhead for each extent insertion */
971                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
972                 /* determine the appropriate chunk size used by osc_extent. */
973                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
974                                           ocd->ocd_grant_blkbits);
975                 /* max_pages_per_rpc must be chunk aligned */
976                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
977                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
978                                              ~chunk_mask) & chunk_mask;
979                 /* determine maximum extent size, in #pages */
980                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
981                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
982                 if (cli->cl_max_extent_pages == 0)
983                         cli->cl_max_extent_pages = 1;
984         } else {
985                 cli->cl_grant_extent_tax = 0;
986                 cli->cl_chunkbits = PAGE_SHIFT;
987                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
988         }
989         spin_unlock(&cli->cl_loi_list_lock);
990
991         CDEBUG(D_CACHE,
992                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
993                cli_name(cli),
994                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
995                cli->cl_max_extent_pages);
996
997         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
998                 osc_add_grant_list(cli);
999 }
1000 EXPORT_SYMBOL(osc_init_grant);
1001
1002 /* We assume that the reason this OSC got a short read is because it read
1003  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1004  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1005  * this stripe never got written at or beyond this stripe offset yet. */
1006 static void handle_short_read(int nob_read, size_t page_count,
1007                               struct brw_page **pga)
1008 {
1009         char *ptr;
1010         int i = 0;
1011
1012         /* skip bytes read OK */
1013         while (nob_read > 0) {
1014                 LASSERT (page_count > 0);
1015
1016                 if (pga[i]->count > nob_read) {
1017                         /* EOF inside this page */
1018                         ptr = kmap(pga[i]->pg) +
1019                                 (pga[i]->off & ~PAGE_MASK);
1020                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1021                         kunmap(pga[i]->pg);
1022                         page_count--;
1023                         i++;
1024                         break;
1025                 }
1026
1027                 nob_read -= pga[i]->count;
1028                 page_count--;
1029                 i++;
1030         }
1031
1032         /* zero remaining pages */
1033         while (page_count-- > 0) {
1034                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1035                 memset(ptr, 0, pga[i]->count);
1036                 kunmap(pga[i]->pg);
1037                 i++;
1038         }
1039 }
1040
1041 static int check_write_rcs(struct ptlrpc_request *req,
1042                            int requested_nob, int niocount,
1043                            size_t page_count, struct brw_page **pga)
1044 {
1045         int     i;
1046         __u32   *remote_rcs;
1047
1048         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1049                                                   sizeof(*remote_rcs) *
1050                                                   niocount);
1051         if (remote_rcs == NULL) {
1052                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1053                 return(-EPROTO);
1054         }
1055
1056         /* return error if any niobuf was in error */
1057         for (i = 0; i < niocount; i++) {
1058                 if ((int)remote_rcs[i] < 0) {
1059                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1060                                i, remote_rcs[i], req);
1061                         return remote_rcs[i];
1062                 }
1063
1064                 if (remote_rcs[i] != 0) {
1065                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1066                                 i, remote_rcs[i], req);
1067                         return(-EPROTO);
1068                 }
1069         }
1070         if (req->rq_bulk != NULL &&
1071             req->rq_bulk->bd_nob_transferred != requested_nob) {
1072                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1073                        req->rq_bulk->bd_nob_transferred, requested_nob);
1074                 return(-EPROTO);
1075         }
1076
1077         return (0);
1078 }
1079
1080 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1081 {
1082         if (p1->flag != p2->flag) {
1083                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1084                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1085                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1086
1087                 /* warn if we try to combine flags that we don't know to be
1088                  * safe to combine */
1089                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1090                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1091                               "report this at https://jira.whamcloud.com/\n",
1092                               p1->flag, p2->flag);
1093                 }
1094                 return 0;
1095         }
1096
1097         return (p1->off + p1->count == p2->off);
1098 }
1099
1100 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1101 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1102                                    size_t pg_count, struct brw_page **pga,
1103                                    int opc, obd_dif_csum_fn *fn,
1104                                    int sector_size,
1105                                    u32 *check_sum)
1106 {
1107         struct ahash_request *req;
1108         /* Used Adler as the default checksum type on top of DIF tags */
1109         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1110         struct page *__page;
1111         unsigned char *buffer;
1112         __u16 *guard_start;
1113         unsigned int bufsize;
1114         int guard_number;
1115         int used_number = 0;
1116         int used;
1117         u32 cksum;
1118         int rc = 0;
1119         int i = 0;
1120
1121         LASSERT(pg_count > 0);
1122
1123         __page = alloc_page(GFP_KERNEL);
1124         if (__page == NULL)
1125                 return -ENOMEM;
1126
1127         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1128         if (IS_ERR(req)) {
1129                 rc = PTR_ERR(req);
1130                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1131                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1132                 GOTO(out, rc);
1133         }
1134
1135         buffer = kmap(__page);
1136         guard_start = (__u16 *)buffer;
1137         guard_number = PAGE_SIZE / sizeof(*guard_start);
1138         while (nob > 0 && pg_count > 0) {
1139                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1140
1141                 /* corrupt the data before we compute the checksum, to
1142                  * simulate an OST->client data error */
1143                 if (unlikely(i == 0 && opc == OST_READ &&
1144                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1145                         unsigned char *ptr = kmap(pga[i]->pg);
1146                         int off = pga[i]->off & ~PAGE_MASK;
1147
1148                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1149                         kunmap(pga[i]->pg);
1150                 }
1151
1152                 /*
1153                  * The left guard number should be able to hold checksums of a
1154                  * whole page
1155                  */
1156                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1157                                                   pga[i]->off & ~PAGE_MASK,
1158                                                   count,
1159                                                   guard_start + used_number,
1160                                                   guard_number - used_number,
1161                                                   &used, sector_size,
1162                                                   fn);
1163                 if (rc)
1164                         break;
1165
1166                 used_number += used;
1167                 if (used_number == guard_number) {
1168                         cfs_crypto_hash_update_page(req, __page, 0,
1169                                 used_number * sizeof(*guard_start));
1170                         used_number = 0;
1171                 }
1172
1173                 nob -= pga[i]->count;
1174                 pg_count--;
1175                 i++;
1176         }
1177         kunmap(__page);
1178         if (rc)
1179                 GOTO(out, rc);
1180
1181         if (used_number != 0)
1182                 cfs_crypto_hash_update_page(req, __page, 0,
1183                         used_number * sizeof(*guard_start));
1184
1185         bufsize = sizeof(cksum);
1186         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1187
1188         /* For sending we only compute the wrong checksum instead
1189          * of corrupting the data so it is still correct on a redo */
1190         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1191                 cksum++;
1192
1193         *check_sum = cksum;
1194 out:
1195         __free_page(__page);
1196         return rc;
1197 }
1198 #else /* !CONFIG_CRC_T10DIF */
1199 #define obd_dif_ip_fn NULL
1200 #define obd_dif_crc_fn NULL
1201 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1202         -EOPNOTSUPP
1203 #endif /* CONFIG_CRC_T10DIF */
1204
1205 static int osc_checksum_bulk(int nob, size_t pg_count,
1206                              struct brw_page **pga, int opc,
1207                              enum cksum_types cksum_type,
1208                              u32 *cksum)
1209 {
1210         int                             i = 0;
1211         struct ahash_request           *req;
1212         unsigned int                    bufsize;
1213         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1214
1215         LASSERT(pg_count > 0);
1216
1217         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1218         if (IS_ERR(req)) {
1219                 CERROR("Unable to initialize checksum hash %s\n",
1220                        cfs_crypto_hash_name(cfs_alg));
1221                 return PTR_ERR(req);
1222         }
1223
1224         while (nob > 0 && pg_count > 0) {
1225                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1226
1227                 /* corrupt the data before we compute the checksum, to
1228                  * simulate an OST->client data error */
1229                 if (i == 0 && opc == OST_READ &&
1230                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1231                         unsigned char *ptr = kmap(pga[i]->pg);
1232                         int off = pga[i]->off & ~PAGE_MASK;
1233
1234                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1235                         kunmap(pga[i]->pg);
1236                 }
1237                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1238                                             pga[i]->off & ~PAGE_MASK,
1239                                             count);
1240                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1241                                (int)(pga[i]->off & ~PAGE_MASK));
1242
1243                 nob -= pga[i]->count;
1244                 pg_count--;
1245                 i++;
1246         }
1247
1248         bufsize = sizeof(*cksum);
1249         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1250
1251         /* For sending we only compute the wrong checksum instead
1252          * of corrupting the data so it is still correct on a redo */
1253         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1254                 (*cksum)++;
1255
1256         return 0;
1257 }
1258
1259 static int osc_checksum_bulk_rw(const char *obd_name,
1260                                 enum cksum_types cksum_type,
1261                                 int nob, size_t pg_count,
1262                                 struct brw_page **pga, int opc,
1263                                 u32 *check_sum)
1264 {
1265         obd_dif_csum_fn *fn = NULL;
1266         int sector_size = 0;
1267         int rc;
1268
1269         ENTRY;
1270         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1271
1272         if (fn)
1273                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1274                                              opc, fn, sector_size, check_sum);
1275         else
1276                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1277                                        check_sum);
1278
1279         RETURN(rc);
1280 }
1281
1282 static int
1283 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1284                      u32 page_count, struct brw_page **pga,
1285                      struct ptlrpc_request **reqp, int resend)
1286 {
1287         struct ptlrpc_request   *req;
1288         struct ptlrpc_bulk_desc *desc;
1289         struct ost_body         *body;
1290         struct obd_ioobj        *ioobj;
1291         struct niobuf_remote    *niobuf;
1292         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1293         struct osc_brw_async_args *aa;
1294         struct req_capsule      *pill;
1295         struct brw_page *pg_prev;
1296         void *short_io_buf;
1297         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1298
1299         ENTRY;
1300         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301                 RETURN(-ENOMEM); /* Recoverable */
1302         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303                 RETURN(-EINVAL); /* Fatal */
1304
1305         if ((cmd & OBD_BRW_WRITE) != 0) {
1306                 opc = OST_WRITE;
1307                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1308                                                 osc_rq_pool,
1309                                                 &RQF_OST_BRW_WRITE);
1310         } else {
1311                 opc = OST_READ;
1312                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1313         }
1314         if (req == NULL)
1315                 RETURN(-ENOMEM);
1316
1317         for (niocount = i = 1; i < page_count; i++) {
1318                 if (!can_merge_pages(pga[i - 1], pga[i]))
1319                         niocount++;
1320         }
1321
1322         pill = &req->rq_pill;
1323         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1324                              sizeof(*ioobj));
1325         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326                              niocount * sizeof(*niobuf));
1327
1328         for (i = 0; i < page_count; i++)
1329                 short_io_size += pga[i]->count;
1330
1331         /* Check if read/write is small enough to be a short io. */
1332         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1333             !imp_connect_shortio(cli->cl_import))
1334                 short_io_size = 0;
1335
1336         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1337                              opc == OST_READ ? 0 : short_io_size);
1338         if (opc == OST_READ)
1339                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1340                                      short_io_size);
1341
1342         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1343         if (rc) {
1344                 ptlrpc_request_free(req);
1345                 RETURN(rc);
1346         }
1347         osc_set_io_portal(req);
1348
1349         ptlrpc_at_set_req_timeout(req);
1350         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1351          * retry logic */
1352         req->rq_no_retry_einprogress = 1;
1353
1354         if (short_io_size != 0) {
1355                 desc = NULL;
1356                 short_io_buf = NULL;
1357                 goto no_bulk;
1358         }
1359
1360         desc = ptlrpc_prep_bulk_imp(req, page_count,
1361                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1362                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1363                         PTLRPC_BULK_PUT_SINK),
1364                 OST_BULK_PORTAL,
1365                 &ptlrpc_bulk_kiov_pin_ops);
1366
1367         if (desc == NULL)
1368                 GOTO(out, rc = -ENOMEM);
1369         /* NB request now owns desc and will free it when it gets freed */
1370 no_bulk:
1371         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1372         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1373         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1374         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1375
1376         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1377
1378         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1379          * and from_kgid(), because they are asynchronous. Fortunately, variable
1380          * oa contains valid o_uid and o_gid in these two operations.
1381          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1382          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1383          * other process logic */
1384         body->oa.o_uid = oa->o_uid;
1385         body->oa.o_gid = oa->o_gid;
1386
1387         obdo_to_ioobj(oa, ioobj);
1388         ioobj->ioo_bufcnt = niocount;
1389         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1390          * that might be send for this request.  The actual number is decided
1391          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1392          * "max - 1" for old client compatibility sending "0", and also so the
1393          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1394         if (desc != NULL)
1395                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1396         else /* short io */
1397                 ioobj_max_brw_set(ioobj, 0);
1398
1399         if (short_io_size != 0) {
1400                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1402                         body->oa.o_flags = 0;
1403                 }
1404                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1405                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1406                        short_io_size);
1407                 if (opc == OST_WRITE) {
1408                         short_io_buf = req_capsule_client_get(pill,
1409                                                               &RMF_SHORT_IO);
1410                         LASSERT(short_io_buf != NULL);
1411                 }
1412         }
1413
1414         LASSERT(page_count > 0);
1415         pg_prev = pga[0];
1416         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1417                 struct brw_page *pg = pga[i];
1418                 int poff = pg->off & ~PAGE_MASK;
1419
1420                 LASSERT(pg->count > 0);
1421                 /* make sure there is no gap in the middle of page array */
1422                 LASSERTF(page_count == 1 ||
1423                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1424                           ergo(i > 0 && i < page_count - 1,
1425                                poff == 0 && pg->count == PAGE_SIZE)   &&
1426                           ergo(i == page_count - 1, poff == 0)),
1427                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1428                          i, page_count, pg, pg->off, pg->count);
1429                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1430                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1431                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1432                          i, page_count,
1433                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1434                          pg_prev->pg, page_private(pg_prev->pg),
1435                          pg_prev->pg->index, pg_prev->off);
1436                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1437                         (pg->flag & OBD_BRW_SRVLOCK));
1438                 if (short_io_size != 0 && opc == OST_WRITE) {
1439                         unsigned char *ptr = kmap_atomic(pg->pg);
1440
1441                         LASSERT(short_io_size >= requested_nob + pg->count);
1442                         memcpy(short_io_buf + requested_nob,
1443                                ptr + poff,
1444                                pg->count);
1445                         kunmap_atomic(ptr);
1446                 } else if (short_io_size == 0) {
1447                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1448                                                          pg->count);
1449                 }
1450                 requested_nob += pg->count;
1451
1452                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1453                         niobuf--;
1454                         niobuf->rnb_len += pg->count;
1455                 } else {
1456                         niobuf->rnb_offset = pg->off;
1457                         niobuf->rnb_len    = pg->count;
1458                         niobuf->rnb_flags  = pg->flag;
1459                 }
1460                 pg_prev = pg;
1461         }
1462
1463         LASSERTF((void *)(niobuf - niocount) ==
1464                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1465                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1466                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1467
1468         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1469         if (resend) {
1470                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1471                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1472                         body->oa.o_flags = 0;
1473                 }
1474                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1475         }
1476
1477         if (osc_should_shrink_grant(cli))
1478                 osc_shrink_grant_local(cli, &body->oa);
1479
1480         /* size[REQ_REC_OFF] still sizeof (*body) */
1481         if (opc == OST_WRITE) {
1482                 if (cli->cl_checksum &&
1483                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1484                         /* store cl_cksum_type in a local variable since
1485                          * it can be changed via lprocfs */
1486                         enum cksum_types cksum_type = cli->cl_cksum_type;
1487
1488                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1489                                 body->oa.o_flags = 0;
1490
1491                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1492                                                                 cksum_type);
1493                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1494
1495                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1496                                                   requested_nob, page_count,
1497                                                   pga, OST_WRITE,
1498                                                   &body->oa.o_cksum);
1499                         if (rc < 0) {
1500                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1501                                        rc);
1502                                 GOTO(out, rc);
1503                         }
1504                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1505                                body->oa.o_cksum);
1506
1507                         /* save this in 'oa', too, for later checking */
1508                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1509                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1510                                                            cksum_type);
1511                 } else {
1512                         /* clear out the checksum flag, in case this is a
1513                          * resend but cl_checksum is no longer set. b=11238 */
1514                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1515                 }
1516                 oa->o_cksum = body->oa.o_cksum;
1517                 /* 1 RC per niobuf */
1518                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1519                                      sizeof(__u32) * niocount);
1520         } else {
1521                 if (cli->cl_checksum &&
1522                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1523                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1524                                 body->oa.o_flags = 0;
1525                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1526                                 cli->cl_cksum_type);
1527                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1528                 }
1529
1530                 /* Client cksum has been already copied to wire obdo in previous
1531                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1532                  * resent due to cksum error, this will allow Server to
1533                  * check+dump pages on its side */
1534         }
1535         ptlrpc_request_set_replen(req);
1536
1537         aa = ptlrpc_req_async_args(aa, req);
1538         aa->aa_oa = oa;
1539         aa->aa_requested_nob = requested_nob;
1540         aa->aa_nio_count = niocount;
1541         aa->aa_page_count = page_count;
1542         aa->aa_resends = 0;
1543         aa->aa_ppga = pga;
1544         aa->aa_cli = cli;
1545         INIT_LIST_HEAD(&aa->aa_oaps);
1546
1547         *reqp = req;
1548         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1549         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1550                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1551                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1552         RETURN(0);
1553
1554  out:
1555         ptlrpc_req_finished(req);
1556         RETURN(rc);
1557 }
1558
1559 char dbgcksum_file_name[PATH_MAX];
1560
1561 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1562                                 struct brw_page **pga, __u32 server_cksum,
1563                                 __u32 client_cksum)
1564 {
1565         struct file *filp;
1566         int rc, i;
1567         unsigned int len;
1568         char *buf;
1569
1570         /* will only keep dump of pages on first error for the same range in
1571          * file/fid, not during the resends/retries. */
1572         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1573                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1574                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1575                   libcfs_debug_file_path_arr :
1576                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1577                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1578                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1579                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1580                  pga[0]->off,
1581                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1582                  client_cksum, server_cksum);
1583         filp = filp_open(dbgcksum_file_name,
1584                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1585         if (IS_ERR(filp)) {
1586                 rc = PTR_ERR(filp);
1587                 if (rc == -EEXIST)
1588                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1589                                "checksum error: rc = %d\n", dbgcksum_file_name,
1590                                rc);
1591                 else
1592                         CERROR("%s: can't open to dump pages with checksum "
1593                                "error: rc = %d\n", dbgcksum_file_name, rc);
1594                 return;
1595         }
1596
1597         for (i = 0; i < page_count; i++) {
1598                 len = pga[i]->count;
1599                 buf = kmap(pga[i]->pg);
1600                 while (len != 0) {
1601                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1602                         if (rc < 0) {
1603                                 CERROR("%s: wanted to write %u but got %d "
1604                                        "error\n", dbgcksum_file_name, len, rc);
1605                                 break;
1606                         }
1607                         len -= rc;
1608                         buf += rc;
1609                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1610                                dbgcksum_file_name, rc);
1611                 }
1612                 kunmap(pga[i]->pg);
1613         }
1614
1615         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1616         if (rc)
1617                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1618         filp_close(filp, NULL);
1619 }
1620
1621 static int
1622 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1623                      __u32 client_cksum, __u32 server_cksum,
1624                      struct osc_brw_async_args *aa)
1625 {
1626         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1627         enum cksum_types cksum_type;
1628         obd_dif_csum_fn *fn = NULL;
1629         int sector_size = 0;
1630         __u32 new_cksum;
1631         char *msg;
1632         int rc;
1633
1634         if (server_cksum == client_cksum) {
1635                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1636                 return 0;
1637         }
1638
1639         if (aa->aa_cli->cl_checksum_dump)
1640                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1641                                     server_cksum, client_cksum);
1642
1643         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1644                                            oa->o_flags : 0);
1645
1646         switch (cksum_type) {
1647         case OBD_CKSUM_T10IP512:
1648                 fn = obd_dif_ip_fn;
1649                 sector_size = 512;
1650                 break;
1651         case OBD_CKSUM_T10IP4K:
1652                 fn = obd_dif_ip_fn;
1653                 sector_size = 4096;
1654                 break;
1655         case OBD_CKSUM_T10CRC512:
1656                 fn = obd_dif_crc_fn;
1657                 sector_size = 512;
1658                 break;
1659         case OBD_CKSUM_T10CRC4K:
1660                 fn = obd_dif_crc_fn;
1661                 sector_size = 4096;
1662                 break;
1663         default:
1664                 break;
1665         }
1666
1667         if (fn)
1668                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1669                                              aa->aa_page_count, aa->aa_ppga,
1670                                              OST_WRITE, fn, sector_size,
1671                                              &new_cksum);
1672         else
1673                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1674                                        aa->aa_ppga, OST_WRITE, cksum_type,
1675                                        &new_cksum);
1676
1677         if (rc < 0)
1678                 msg = "failed to calculate the client write checksum";
1679         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1680                 msg = "the server did not use the checksum type specified in "
1681                       "the original request - likely a protocol problem";
1682         else if (new_cksum == server_cksum)
1683                 msg = "changed on the client after we checksummed it - "
1684                       "likely false positive due to mmap IO (bug 11742)";
1685         else if (new_cksum == client_cksum)
1686                 msg = "changed in transit before arrival at OST";
1687         else
1688                 msg = "changed in transit AND doesn't match the original - "
1689                       "likely false positive due to mmap IO (bug 11742)";
1690
1691         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1692                            DFID " object "DOSTID" extent [%llu-%llu], original "
1693                            "client csum %x (type %x), server csum %x (type %x),"
1694                            " client csum now %x\n",
1695                            obd_name, msg, libcfs_nid2str(peer->nid),
1696                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1697                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1698                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1699                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1700                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1701                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1702                            client_cksum,
1703                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1704                            server_cksum, cksum_type, new_cksum);
1705         return 1;
1706 }
1707
1708 /* Note rc enters this function as number of bytes transferred */
1709 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1710 {
1711         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1712         struct client_obd *cli = aa->aa_cli;
1713         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1714         const struct lnet_process_id *peer =
1715                 &req->rq_import->imp_connection->c_peer;
1716         struct ost_body *body;
1717         u32 client_cksum = 0;
1718
1719         ENTRY;
1720
1721         if (rc < 0 && rc != -EDQUOT) {
1722                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1723                 RETURN(rc);
1724         }
1725
1726         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1727         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1728         if (body == NULL) {
1729                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1730                 RETURN(-EPROTO);
1731         }
1732
1733         /* set/clear over quota flag for a uid/gid/projid */
1734         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1735             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1736                 unsigned qid[LL_MAXQUOTAS] = {
1737                                          body->oa.o_uid, body->oa.o_gid,
1738                                          body->oa.o_projid };
1739                 CDEBUG(D_QUOTA,
1740                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1741                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1742                        body->oa.o_valid, body->oa.o_flags);
1743                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1744                                        body->oa.o_flags);
1745         }
1746
1747         osc_update_grant(cli, body);
1748
1749         if (rc < 0)
1750                 RETURN(rc);
1751
1752         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1753                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1754
1755         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1756                 if (rc > 0) {
1757                         CERROR("%s: unexpected positive size %d\n",
1758                                obd_name, rc);
1759                         RETURN(-EPROTO);
1760                 }
1761
1762                 if (req->rq_bulk != NULL &&
1763                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1764                         RETURN(-EAGAIN);
1765
1766                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1767                     check_write_checksum(&body->oa, peer, client_cksum,
1768                                          body->oa.o_cksum, aa))
1769                         RETURN(-EAGAIN);
1770
1771                 rc = check_write_rcs(req, aa->aa_requested_nob,
1772                                      aa->aa_nio_count, aa->aa_page_count,
1773                                      aa->aa_ppga);
1774                 GOTO(out, rc);
1775         }
1776
1777         /* The rest of this function executes only for OST_READs */
1778
1779         if (req->rq_bulk == NULL) {
1780                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1781                                           RCL_SERVER);
1782                 LASSERT(rc == req->rq_status);
1783         } else {
1784                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1785                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1786         }
1787         if (rc < 0)
1788                 GOTO(out, rc = -EAGAIN);
1789
1790         if (rc > aa->aa_requested_nob) {
1791                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1792                        rc, aa->aa_requested_nob);
1793                 RETURN(-EPROTO);
1794         }
1795
1796         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1797                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1798                        rc, req->rq_bulk->bd_nob_transferred);
1799                 RETURN(-EPROTO);
1800         }
1801
1802         if (req->rq_bulk == NULL) {
1803                 /* short io */
1804                 int nob, pg_count, i = 0;
1805                 unsigned char *buf;
1806
1807                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1808                 pg_count = aa->aa_page_count;
1809                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1810                                                    rc);
1811                 nob = rc;
1812                 while (nob > 0 && pg_count > 0) {
1813                         unsigned char *ptr;
1814                         int count = aa->aa_ppga[i]->count > nob ?
1815                                     nob : aa->aa_ppga[i]->count;
1816
1817                         CDEBUG(D_CACHE, "page %p count %d\n",
1818                                aa->aa_ppga[i]->pg, count);
1819                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1820                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1821                                count);
1822                         kunmap_atomic((void *) ptr);
1823
1824                         buf += count;
1825                         nob -= count;
1826                         i++;
1827                         pg_count--;
1828                 }
1829         }
1830
1831         if (rc < aa->aa_requested_nob)
1832                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1833
1834         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1835                 static int cksum_counter;
1836                 u32        server_cksum = body->oa.o_cksum;
1837                 char      *via = "";
1838                 char      *router = "";
1839                 enum cksum_types cksum_type;
1840                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1841                         body->oa.o_flags : 0;
1842
1843                 cksum_type = obd_cksum_type_unpack(o_flags);
1844                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1845                                           aa->aa_page_count, aa->aa_ppga,
1846                                           OST_READ, &client_cksum);
1847                 if (rc < 0)
1848                         GOTO(out, rc);
1849
1850                 if (req->rq_bulk != NULL &&
1851                     peer->nid != req->rq_bulk->bd_sender) {
1852                         via = " via ";
1853                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1854                 }
1855
1856                 if (server_cksum != client_cksum) {
1857                         struct ost_body *clbody;
1858                         u32 page_count = aa->aa_page_count;
1859
1860                         clbody = req_capsule_client_get(&req->rq_pill,
1861                                                         &RMF_OST_BODY);
1862                         if (cli->cl_checksum_dump)
1863                                 dump_all_bulk_pages(&clbody->oa, page_count,
1864                                                     aa->aa_ppga, server_cksum,
1865                                                     client_cksum);
1866
1867                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1868                                            "%s%s%s inode "DFID" object "DOSTID
1869                                            " extent [%llu-%llu], client %x, "
1870                                            "server %x, cksum_type %x\n",
1871                                            obd_name,
1872                                            libcfs_nid2str(peer->nid),
1873                                            via, router,
1874                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1875                                                 clbody->oa.o_parent_seq : 0ULL,
1876                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1877                                                 clbody->oa.o_parent_oid : 0,
1878                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1879                                                 clbody->oa.o_parent_ver : 0,
1880                                            POSTID(&body->oa.o_oi),
1881                                            aa->aa_ppga[0]->off,
1882                                            aa->aa_ppga[page_count-1]->off +
1883                                            aa->aa_ppga[page_count-1]->count - 1,
1884                                            client_cksum, server_cksum,
1885                                            cksum_type);
1886                         cksum_counter = 0;
1887                         aa->aa_oa->o_cksum = client_cksum;
1888                         rc = -EAGAIN;
1889                 } else {
1890                         cksum_counter++;
1891                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1892                         rc = 0;
1893                 }
1894         } else if (unlikely(client_cksum)) {
1895                 static int cksum_missed;
1896
1897                 cksum_missed++;
1898                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1899                         CERROR("%s: checksum %u requested from %s but not sent\n",
1900                                obd_name, cksum_missed,
1901                                libcfs_nid2str(peer->nid));
1902         } else {
1903                 rc = 0;
1904         }
1905 out:
1906         if (rc >= 0)
1907                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1908                                      aa->aa_oa, &body->oa);
1909
1910         RETURN(rc);
1911 }
1912
1913 static int osc_brw_redo_request(struct ptlrpc_request *request,
1914                                 struct osc_brw_async_args *aa, int rc)
1915 {
1916         struct ptlrpc_request *new_req;
1917         struct osc_brw_async_args *new_aa;
1918         struct osc_async_page *oap;
1919         ENTRY;
1920
1921         /* The below message is checked in replay-ost-single.sh test_8ae*/
1922         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1923                   "redo for recoverable error %d", rc);
1924
1925         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1926                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1927                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1928                                   aa->aa_ppga, &new_req, 1);
1929         if (rc)
1930                 RETURN(rc);
1931
1932         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1933                 if (oap->oap_request != NULL) {
1934                         LASSERTF(request == oap->oap_request,
1935                                  "request %p != oap_request %p\n",
1936                                  request, oap->oap_request);
1937                 }
1938         }
1939         /*
1940          * New request takes over pga and oaps from old request.
1941          * Note that copying a list_head doesn't work, need to move it...
1942          */
1943         aa->aa_resends++;
1944         new_req->rq_interpret_reply = request->rq_interpret_reply;
1945         new_req->rq_async_args = request->rq_async_args;
1946         new_req->rq_commit_cb = request->rq_commit_cb;
1947         /* cap resend delay to the current request timeout, this is similar to
1948          * what ptlrpc does (see after_reply()) */
1949         if (aa->aa_resends > new_req->rq_timeout)
1950                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1951         else
1952                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1953         new_req->rq_generation_set = 1;
1954         new_req->rq_import_generation = request->rq_import_generation;
1955
1956         new_aa = ptlrpc_req_async_args(new_aa, new_req);
1957
1958         INIT_LIST_HEAD(&new_aa->aa_oaps);
1959         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1960         INIT_LIST_HEAD(&new_aa->aa_exts);
1961         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1962         new_aa->aa_resends = aa->aa_resends;
1963
1964         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1965                 if (oap->oap_request) {
1966                         ptlrpc_req_finished(oap->oap_request);
1967                         oap->oap_request = ptlrpc_request_addref(new_req);
1968                 }
1969         }
1970
1971         /* XXX: This code will run into problem if we're going to support
1972          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1973          * and wait for all of them to be finished. We should inherit request
1974          * set from old request. */
1975         ptlrpcd_add_req(new_req);
1976
1977         DEBUG_REQ(D_INFO, new_req, "new request");
1978         RETURN(0);
1979 }
1980
1981 /*
1982  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1983  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1984  * fine for our small page arrays and doesn't require allocation.  its an
1985  * insertion sort that swaps elements that are strides apart, shrinking the
1986  * stride down until its '1' and the array is sorted.
1987  */
1988 static void sort_brw_pages(struct brw_page **array, int num)
1989 {
1990         int stride, i, j;
1991         struct brw_page *tmp;
1992
1993         if (num == 1)
1994                 return;
1995         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1996                 ;
1997
1998         do {
1999                 stride /= 3;
2000                 for (i = stride ; i < num ; i++) {
2001                         tmp = array[i];
2002                         j = i;
2003                         while (j >= stride && array[j - stride]->off > tmp->off) {
2004                                 array[j] = array[j - stride];
2005                                 j -= stride;
2006                         }
2007                         array[j] = tmp;
2008                 }
2009         } while (stride > 1);
2010 }
2011
2012 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2013 {
2014         LASSERT(ppga != NULL);
2015         OBD_FREE_PTR_ARRAY(ppga, count);
2016 }
2017
2018 static int brw_interpret(const struct lu_env *env,
2019                          struct ptlrpc_request *req, void *args, int rc)
2020 {
2021         struct osc_brw_async_args *aa = args;
2022         struct osc_extent *ext;
2023         struct osc_extent *tmp;
2024         struct client_obd *cli = aa->aa_cli;
2025         unsigned long transferred = 0;
2026
2027         ENTRY;
2028
2029         rc = osc_brw_fini_request(req, rc);
2030         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2031         /*
2032          * When server returns -EINPROGRESS, client should always retry
2033          * regardless of the number of times the bulk was resent already.
2034          */
2035         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2036                 if (req->rq_import_generation !=
2037                     req->rq_import->imp_generation) {
2038                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2039                                ""DOSTID", rc = %d.\n",
2040                                req->rq_import->imp_obd->obd_name,
2041                                POSTID(&aa->aa_oa->o_oi), rc);
2042                 } else if (rc == -EINPROGRESS ||
2043                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2044                         rc = osc_brw_redo_request(req, aa, rc);
2045                 } else {
2046                         CERROR("%s: too many resent retries for object: "
2047                                "%llu:%llu, rc = %d.\n",
2048                                req->rq_import->imp_obd->obd_name,
2049                                POSTID(&aa->aa_oa->o_oi), rc);
2050                 }
2051
2052                 if (rc == 0)
2053                         RETURN(0);
2054                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2055                         rc = -EIO;
2056         }
2057
2058         if (rc == 0) {
2059                 struct obdo *oa = aa->aa_oa;
2060                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2061                 unsigned long valid = 0;
2062                 struct cl_object *obj;
2063                 struct osc_async_page *last;
2064
2065                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2066                 obj = osc2cl(last->oap_obj);
2067
2068                 cl_object_attr_lock(obj);
2069                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2070                         attr->cat_blocks = oa->o_blocks;
2071                         valid |= CAT_BLOCKS;
2072                 }
2073                 if (oa->o_valid & OBD_MD_FLMTIME) {
2074                         attr->cat_mtime = oa->o_mtime;
2075                         valid |= CAT_MTIME;
2076                 }
2077                 if (oa->o_valid & OBD_MD_FLATIME) {
2078                         attr->cat_atime = oa->o_atime;
2079                         valid |= CAT_ATIME;
2080                 }
2081                 if (oa->o_valid & OBD_MD_FLCTIME) {
2082                         attr->cat_ctime = oa->o_ctime;
2083                         valid |= CAT_CTIME;
2084                 }
2085
2086                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2087                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2088                         loff_t last_off = last->oap_count + last->oap_obj_off +
2089                                 last->oap_page_off;
2090
2091                         /* Change file size if this is an out of quota or
2092                          * direct IO write and it extends the file size */
2093                         if (loi->loi_lvb.lvb_size < last_off) {
2094                                 attr->cat_size = last_off;
2095                                 valid |= CAT_SIZE;
2096                         }
2097                         /* Extend KMS if it's not a lockless write */
2098                         if (loi->loi_kms < last_off &&
2099                             oap2osc_page(last)->ops_srvlock == 0) {
2100                                 attr->cat_kms = last_off;
2101                                 valid |= CAT_KMS;
2102                         }
2103                 }
2104
2105                 if (valid != 0)
2106                         cl_object_attr_update(env, obj, attr, valid);
2107                 cl_object_attr_unlock(obj);
2108         }
2109         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2110         aa->aa_oa = NULL;
2111
2112         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2113                 osc_inc_unstable_pages(req);
2114
2115         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2116                 list_del_init(&ext->oe_link);
2117                 osc_extent_finish(env, ext, 1,
2118                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2119         }
2120         LASSERT(list_empty(&aa->aa_exts));
2121         LASSERT(list_empty(&aa->aa_oaps));
2122
2123         transferred = (req->rq_bulk == NULL ? /* short io */
2124                        aa->aa_requested_nob :
2125                        req->rq_bulk->bd_nob_transferred);
2126
2127         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2128         ptlrpc_lprocfs_brw(req, transferred);
2129
2130         spin_lock(&cli->cl_loi_list_lock);
2131         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2132          * is called so we know whether to go to sync BRWs or wait for more
2133          * RPCs to complete */
2134         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2135                 cli->cl_w_in_flight--;
2136         else
2137                 cli->cl_r_in_flight--;
2138         osc_wake_cache_waiters(cli);
2139         spin_unlock(&cli->cl_loi_list_lock);
2140
2141         osc_io_unplug(env, cli, NULL);
2142         RETURN(rc);
2143 }
2144
2145 static void brw_commit(struct ptlrpc_request *req)
2146 {
2147         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2148          * this called via the rq_commit_cb, I need to ensure
2149          * osc_dec_unstable_pages is still called. Otherwise unstable
2150          * pages may be leaked. */
2151         spin_lock(&req->rq_lock);
2152         if (likely(req->rq_unstable)) {
2153                 req->rq_unstable = 0;
2154                 spin_unlock(&req->rq_lock);
2155
2156                 osc_dec_unstable_pages(req);
2157         } else {
2158                 req->rq_committed = 1;
2159                 spin_unlock(&req->rq_lock);
2160         }
2161 }
2162
2163 /**
2164  * Build an RPC by the list of extent @ext_list. The caller must ensure
2165  * that the total pages in this list are NOT over max pages per RPC.
2166  * Extents in the list must be in OES_RPC state.
2167  */
2168 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2169                   struct list_head *ext_list, int cmd)
2170 {
2171         struct ptlrpc_request           *req = NULL;
2172         struct osc_extent               *ext;
2173         struct brw_page                 **pga = NULL;
2174         struct osc_brw_async_args       *aa = NULL;
2175         struct obdo                     *oa = NULL;
2176         struct osc_async_page           *oap;
2177         struct osc_object               *obj = NULL;
2178         struct cl_req_attr              *crattr = NULL;
2179         loff_t                          starting_offset = OBD_OBJECT_EOF;
2180         loff_t                          ending_offset = 0;
2181         /* '1' for consistency with code that checks !mpflag to restore */
2182         int mpflag = 1;
2183         int                             mem_tight = 0;
2184         int                             page_count = 0;
2185         bool                            soft_sync = false;
2186         bool                            ndelay = false;
2187         int                             i;
2188         int                             grant = 0;
2189         int                             rc;
2190         __u32                           layout_version = 0;
2191         LIST_HEAD(rpc_list);
2192         struct ost_body                 *body;
2193         ENTRY;
2194         LASSERT(!list_empty(ext_list));
2195
2196         /* add pages into rpc_list to build BRW rpc */
2197         list_for_each_entry(ext, ext_list, oe_link) {
2198                 LASSERT(ext->oe_state == OES_RPC);
2199                 mem_tight |= ext->oe_memalloc;
2200                 grant += ext->oe_grants;
2201                 page_count += ext->oe_nr_pages;
2202                 layout_version = max(layout_version, ext->oe_layout_version);
2203                 if (obj == NULL)
2204                         obj = ext->oe_obj;
2205         }
2206
2207         soft_sync = osc_over_unstable_soft_limit(cli);
2208         if (mem_tight)
2209                 mpflag = memalloc_noreclaim_save();
2210
2211         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2212         if (pga == NULL)
2213                 GOTO(out, rc = -ENOMEM);
2214
2215         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2216         if (oa == NULL)
2217                 GOTO(out, rc = -ENOMEM);
2218
2219         i = 0;
2220         list_for_each_entry(ext, ext_list, oe_link) {
2221                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2222                         if (mem_tight)
2223                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2224                         if (soft_sync)
2225                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2226                         pga[i] = &oap->oap_brw_page;
2227                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2228                         i++;
2229
2230                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2231                         if (starting_offset == OBD_OBJECT_EOF ||
2232                             starting_offset > oap->oap_obj_off)
2233                                 starting_offset = oap->oap_obj_off;
2234                         else
2235                                 LASSERT(oap->oap_page_off == 0);
2236                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2237                                 ending_offset = oap->oap_obj_off +
2238                                                 oap->oap_count;
2239                         else
2240                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2241                                         PAGE_SIZE);
2242                 }
2243                 if (ext->oe_ndelay)
2244                         ndelay = true;
2245         }
2246
2247         /* first page in the list */
2248         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2249
2250         crattr = &osc_env_info(env)->oti_req_attr;
2251         memset(crattr, 0, sizeof(*crattr));
2252         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2253         crattr->cra_flags = ~0ULL;
2254         crattr->cra_page = oap2cl_page(oap);
2255         crattr->cra_oa = oa;
2256         cl_req_attr_set(env, osc2cl(obj), crattr);
2257
2258         if (cmd == OBD_BRW_WRITE) {
2259                 oa->o_grant_used = grant;
2260                 if (layout_version > 0) {
2261                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2262                                PFID(&oa->o_oi.oi_fid), layout_version);
2263
2264                         oa->o_layout_version = layout_version;
2265                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2266                 }
2267         }
2268
2269         sort_brw_pages(pga, page_count);
2270         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2271         if (rc != 0) {
2272                 CERROR("prep_req failed: %d\n", rc);
2273                 GOTO(out, rc);
2274         }
2275
2276         req->rq_commit_cb = brw_commit;
2277         req->rq_interpret_reply = brw_interpret;
2278         req->rq_memalloc = mem_tight != 0;
2279         oap->oap_request = ptlrpc_request_addref(req);
2280         if (ndelay) {
2281                 req->rq_no_resend = req->rq_no_delay = 1;
2282                 /* probably set a shorter timeout value.
2283                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2284                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2285         }
2286
2287         /* Need to update the timestamps after the request is built in case
2288          * we race with setattr (locally or in queue at OST).  If OST gets
2289          * later setattr before earlier BRW (as determined by the request xid),
2290          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2291          * way to do this in a single call.  bug 10150 */
2292         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2293         crattr->cra_oa = &body->oa;
2294         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2295         cl_req_attr_set(env, osc2cl(obj), crattr);
2296         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2297
2298         aa = ptlrpc_req_async_args(aa, req);
2299         INIT_LIST_HEAD(&aa->aa_oaps);
2300         list_splice_init(&rpc_list, &aa->aa_oaps);
2301         INIT_LIST_HEAD(&aa->aa_exts);
2302         list_splice_init(ext_list, &aa->aa_exts);
2303
2304         spin_lock(&cli->cl_loi_list_lock);
2305         starting_offset >>= PAGE_SHIFT;
2306         if (cmd == OBD_BRW_READ) {
2307                 cli->cl_r_in_flight++;
2308                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2309                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2310                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2311                                       starting_offset + 1);
2312         } else {
2313                 cli->cl_w_in_flight++;
2314                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2315                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2316                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2317                                       starting_offset + 1);
2318         }
2319         spin_unlock(&cli->cl_loi_list_lock);
2320
2321         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2322                   page_count, aa, cli->cl_r_in_flight,
2323                   cli->cl_w_in_flight);
2324         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2325
2326         ptlrpcd_add_req(req);
2327         rc = 0;
2328         EXIT;
2329
2330 out:
2331         if (mem_tight)
2332                 memalloc_noreclaim_restore(mpflag);
2333
2334         if (rc != 0) {
2335                 LASSERT(req == NULL);
2336
2337                 if (oa)
2338                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2339                 if (pga)
2340                         OBD_FREE_PTR_ARRAY(pga, page_count);
2341                 /* this should happen rarely and is pretty bad, it makes the
2342                  * pending list not follow the dirty order */
2343                 while (!list_empty(ext_list)) {
2344                         ext = list_entry(ext_list->next, struct osc_extent,
2345                                          oe_link);
2346                         list_del_init(&ext->oe_link);
2347                         osc_extent_finish(env, ext, 0, rc);
2348                 }
2349         }
2350         RETURN(rc);
2351 }
2352
2353 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2354 {
2355         int set = 0;
2356
2357         LASSERT(lock != NULL);
2358
2359         lock_res_and_lock(lock);
2360
2361         if (lock->l_ast_data == NULL)
2362                 lock->l_ast_data = data;
2363         if (lock->l_ast_data == data)
2364                 set = 1;
2365
2366         unlock_res_and_lock(lock);
2367
2368         return set;
2369 }
2370
2371 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2372                      void *cookie, struct lustre_handle *lockh,
2373                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2374                      int errcode)
2375 {
2376         bool intent = *flags & LDLM_FL_HAS_INTENT;
2377         int rc;
2378         ENTRY;
2379
2380         /* The request was created before ldlm_cli_enqueue call. */
2381         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2382                 struct ldlm_reply *rep;
2383
2384                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2385                 LASSERT(rep != NULL);
2386
2387                 rep->lock_policy_res1 =
2388                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2389                 if (rep->lock_policy_res1)
2390                         errcode = rep->lock_policy_res1;
2391                 if (!speculative)
2392                         *flags |= LDLM_FL_LVB_READY;
2393         } else if (errcode == ELDLM_OK) {
2394                 *flags |= LDLM_FL_LVB_READY;
2395         }
2396
2397         /* Call the update callback. */
2398         rc = (*upcall)(cookie, lockh, errcode);
2399
2400         /* release the reference taken in ldlm_cli_enqueue() */
2401         if (errcode == ELDLM_LOCK_MATCHED)
2402                 errcode = ELDLM_OK;
2403         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2404                 ldlm_lock_decref(lockh, mode);
2405
2406         RETURN(rc);
2407 }
2408
2409 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2410                           void *args, int rc)
2411 {
2412         struct osc_enqueue_args *aa = args;
2413         struct ldlm_lock *lock;
2414         struct lustre_handle *lockh = &aa->oa_lockh;
2415         enum ldlm_mode mode = aa->oa_mode;
2416         struct ost_lvb *lvb = aa->oa_lvb;
2417         __u32 lvb_len = sizeof(*lvb);
2418         __u64 flags = 0;
2419
2420         ENTRY;
2421
2422         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2423          * be valid. */
2424         lock = ldlm_handle2lock(lockh);
2425         LASSERTF(lock != NULL,
2426                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2427                  lockh->cookie, req, aa);
2428
2429         /* Take an additional reference so that a blocking AST that
2430          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2431          * to arrive after an upcall has been executed by
2432          * osc_enqueue_fini(). */
2433         ldlm_lock_addref(lockh, mode);
2434
2435         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2436         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2437
2438         /* Let CP AST to grant the lock first. */
2439         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2440
2441         if (aa->oa_speculative) {
2442                 LASSERT(aa->oa_lvb == NULL);
2443                 LASSERT(aa->oa_flags == NULL);
2444                 aa->oa_flags = &flags;
2445         }
2446
2447         /* Complete obtaining the lock procedure. */
2448         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2449                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2450                                    lockh, rc);
2451         /* Complete osc stuff. */
2452         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2453                               aa->oa_flags, aa->oa_speculative, rc);
2454
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2456
2457         ldlm_lock_decref(lockh, mode);
2458         LDLM_LOCK_PUT(lock);
2459         RETURN(rc);
2460 }
2461
2462 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2463  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2464  * other synchronous requests, however keeping some locks and trying to obtain
2465  * others may take a considerable amount of time in a case of ost failure; and
2466  * when other sync requests do not get released lock from a client, the client
2467  * is evicted from the cluster -- such scenarious make the life difficult, so
2468  * release locks just after they are obtained. */
2469 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2470                      __u64 *flags, union ldlm_policy_data *policy,
2471                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2472                      void *cookie, struct ldlm_enqueue_info *einfo,
2473                      struct ptlrpc_request_set *rqset, int async,
2474                      bool speculative)
2475 {
2476         struct obd_device *obd = exp->exp_obd;
2477         struct lustre_handle lockh = { 0 };
2478         struct ptlrpc_request *req = NULL;
2479         int intent = *flags & LDLM_FL_HAS_INTENT;
2480         __u64 match_flags = *flags;
2481         enum ldlm_mode mode;
2482         int rc;
2483         ENTRY;
2484
2485         /* Filesystem lock extents are extended to page boundaries so that
2486          * dealing with the page cache is a little smoother.  */
2487         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2488         policy->l_extent.end |= ~PAGE_MASK;
2489
2490         /* Next, search for already existing extent locks that will cover us */
2491         /* If we're trying to read, we also search for an existing PW lock.  The
2492          * VFS and page cache already protect us locally, so lots of readers/
2493          * writers can share a single PW lock.
2494          *
2495          * There are problems with conversion deadlocks, so instead of
2496          * converting a read lock to a write lock, we'll just enqueue a new
2497          * one.
2498          *
2499          * At some point we should cancel the read lock instead of making them
2500          * send us a blocking callback, but there are problems with canceling
2501          * locks out from other users right now, too. */
2502         mode = einfo->ei_mode;
2503         if (einfo->ei_mode == LCK_PR)
2504                 mode |= LCK_PW;
2505         /* Normal lock requests must wait for the LVB to be ready before
2506          * matching a lock; speculative lock requests do not need to,
2507          * because they will not actually use the lock. */
2508         if (!speculative)
2509                 match_flags |= LDLM_FL_LVB_READY;
2510         if (intent != 0)
2511                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2512         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2513                                einfo->ei_type, policy, mode, &lockh, 0);
2514         if (mode) {
2515                 struct ldlm_lock *matched;
2516
2517                 if (*flags & LDLM_FL_TEST_LOCK)
2518                         RETURN(ELDLM_OK);
2519
2520                 matched = ldlm_handle2lock(&lockh);
2521                 if (speculative) {
2522                         /* This DLM lock request is speculative, and does not
2523                          * have an associated IO request. Therefore if there
2524                          * is already a DLM lock, it wll just inform the
2525                          * caller to cancel the request for this stripe.*/
2526                         lock_res_and_lock(matched);
2527                         if (ldlm_extent_equal(&policy->l_extent,
2528                             &matched->l_policy_data.l_extent))
2529                                 rc = -EEXIST;
2530                         else
2531                                 rc = -ECANCELED;
2532                         unlock_res_and_lock(matched);
2533
2534                         ldlm_lock_decref(&lockh, mode);
2535                         LDLM_LOCK_PUT(matched);
2536                         RETURN(rc);
2537                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2538                         *flags |= LDLM_FL_LVB_READY;
2539
2540                         /* We already have a lock, and it's referenced. */
2541                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2542
2543                         ldlm_lock_decref(&lockh, mode);
2544                         LDLM_LOCK_PUT(matched);
2545                         RETURN(ELDLM_OK);
2546                 } else {
2547                         ldlm_lock_decref(&lockh, mode);
2548                         LDLM_LOCK_PUT(matched);
2549                 }
2550         }
2551
2552         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2553                 RETURN(-ENOLCK);
2554
2555         if (intent) {
2556                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2557                                            &RQF_LDLM_ENQUEUE_LVB);
2558                 if (req == NULL)
2559                         RETURN(-ENOMEM);
2560
2561                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2562                 if (rc) {
2563                         ptlrpc_request_free(req);
2564                         RETURN(rc);
2565                 }
2566
2567                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2568                                      sizeof *lvb);
2569                 ptlrpc_request_set_replen(req);
2570         }
2571
2572         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2573         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2574
2575         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2576                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2577         if (async) {
2578                 if (!rc) {
2579                         struct osc_enqueue_args *aa;
2580                         aa = ptlrpc_req_async_args(aa, req);
2581                         aa->oa_exp         = exp;
2582                         aa->oa_mode        = einfo->ei_mode;
2583                         aa->oa_type        = einfo->ei_type;
2584                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2585                         aa->oa_upcall      = upcall;
2586                         aa->oa_cookie      = cookie;
2587                         aa->oa_speculative = speculative;
2588                         if (!speculative) {
2589                                 aa->oa_flags  = flags;
2590                                 aa->oa_lvb    = lvb;
2591                         } else {
2592                                 /* speculative locks are essentially to enqueue
2593                                  * a DLM lock  in advance, so we don't care
2594                                  * about the result of the enqueue. */
2595                                 aa->oa_lvb    = NULL;
2596                                 aa->oa_flags  = NULL;
2597                         }
2598
2599                         req->rq_interpret_reply = osc_enqueue_interpret;
2600                         ptlrpc_set_add_req(rqset, req);
2601                 } else if (intent) {
2602                         ptlrpc_req_finished(req);
2603                 }
2604                 RETURN(rc);
2605         }
2606
2607         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2608                               flags, speculative, rc);
2609         if (intent)
2610                 ptlrpc_req_finished(req);
2611
2612         RETURN(rc);
2613 }
2614
2615 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2616                    struct ldlm_res_id *res_id, enum ldlm_type type,
2617                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2618                    __u64 *flags, struct osc_object *obj,
2619                    struct lustre_handle *lockh, int unref)
2620 {
2621         struct obd_device *obd = exp->exp_obd;
2622         __u64 lflags = *flags;
2623         enum ldlm_mode rc;
2624         ENTRY;
2625
2626         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2627                 RETURN(-EIO);
2628
2629         /* Filesystem lock extents are extended to page boundaries so that
2630          * dealing with the page cache is a little smoother */
2631         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2632         policy->l_extent.end |= ~PAGE_MASK;
2633
2634         /* Next, search for already existing extent locks that will cover us */
2635         /* If we're trying to read, we also search for an existing PW lock.  The
2636          * VFS and page cache already protect us locally, so lots of readers/
2637          * writers can share a single PW lock. */
2638         rc = mode;
2639         if (mode == LCK_PR)
2640                 rc |= LCK_PW;
2641         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2642                              res_id, type, policy, rc, lockh, unref);
2643         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2644                 RETURN(rc);
2645
2646         if (obj != NULL) {
2647                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2648
2649                 LASSERT(lock != NULL);
2650                 if (osc_set_lock_data(lock, obj)) {
2651                         lock_res_and_lock(lock);
2652                         if (!ldlm_is_lvb_cached(lock)) {
2653                                 LASSERT(lock->l_ast_data == obj);
2654                                 osc_lock_lvb_update(env, obj, lock, NULL);
2655                                 ldlm_set_lvb_cached(lock);
2656                         }
2657                         unlock_res_and_lock(lock);
2658                 } else {
2659                         ldlm_lock_decref(lockh, rc);
2660                         rc = 0;
2661                 }
2662                 LDLM_LOCK_PUT(lock);
2663         }
2664         RETURN(rc);
2665 }
2666
2667 static int osc_statfs_interpret(const struct lu_env *env,
2668                                 struct ptlrpc_request *req, void *args, int rc)
2669 {
2670         struct osc_async_args *aa = args;
2671         struct obd_statfs *msfs;
2672
2673         ENTRY;
2674         if (rc == -EBADR)
2675                 /*
2676                  * The request has in fact never been sent due to issues at
2677                  * a higher level (LOV).  Exit immediately since the caller
2678                  * is aware of the problem and takes care of the clean up.
2679                  */
2680                 RETURN(rc);
2681
2682         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2683             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2684                 GOTO(out, rc = 0);
2685
2686         if (rc != 0)
2687                 GOTO(out, rc);
2688
2689         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2690         if (msfs == NULL)
2691                 GOTO(out, rc = -EPROTO);
2692
2693         *aa->aa_oi->oi_osfs = *msfs;
2694 out:
2695         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2696
2697         RETURN(rc);
2698 }
2699
2700 static int osc_statfs_async(struct obd_export *exp,
2701                             struct obd_info *oinfo, time64_t max_age,
2702                             struct ptlrpc_request_set *rqset)
2703 {
2704         struct obd_device     *obd = class_exp2obd(exp);
2705         struct ptlrpc_request *req;
2706         struct osc_async_args *aa;
2707         int rc;
2708         ENTRY;
2709
2710         if (obd->obd_osfs_age >= max_age) {
2711                 CDEBUG(D_SUPER,
2712                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2713                        obd->obd_name, &obd->obd_osfs,
2714                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2715                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2716                 spin_lock(&obd->obd_osfs_lock);
2717                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2718                 spin_unlock(&obd->obd_osfs_lock);
2719                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2720                 if (oinfo->oi_cb_up)
2721                         oinfo->oi_cb_up(oinfo, 0);
2722
2723                 RETURN(0);
2724         }
2725
2726         /* We could possibly pass max_age in the request (as an absolute
2727          * timestamp or a "seconds.usec ago") so the target can avoid doing
2728          * extra calls into the filesystem if that isn't necessary (e.g.
2729          * during mount that would help a bit).  Having relative timestamps
2730          * is not so great if request processing is slow, while absolute
2731          * timestamps are not ideal because they need time synchronization. */
2732         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2733         if (req == NULL)
2734                 RETURN(-ENOMEM);
2735
2736         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2737         if (rc) {
2738                 ptlrpc_request_free(req);
2739                 RETURN(rc);
2740         }
2741         ptlrpc_request_set_replen(req);
2742         req->rq_request_portal = OST_CREATE_PORTAL;
2743         ptlrpc_at_set_req_timeout(req);
2744
2745         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2746                 /* procfs requests not want stat in wait for avoid deadlock */
2747                 req->rq_no_resend = 1;
2748                 req->rq_no_delay = 1;
2749         }
2750
2751         req->rq_interpret_reply = osc_statfs_interpret;
2752         aa = ptlrpc_req_async_args(aa, req);
2753         aa->aa_oi = oinfo;
2754
2755         ptlrpc_set_add_req(rqset, req);
2756         RETURN(0);
2757 }
2758
2759 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2760                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2761 {
2762         struct obd_device     *obd = class_exp2obd(exp);
2763         struct obd_statfs     *msfs;
2764         struct ptlrpc_request *req;
2765         struct obd_import     *imp = NULL;
2766         int rc;
2767         ENTRY;
2768
2769
2770         /*Since the request might also come from lprocfs, so we need
2771          *sync this with client_disconnect_export Bug15684*/
2772         down_read(&obd->u.cli.cl_sem);
2773         if (obd->u.cli.cl_import)
2774                 imp = class_import_get(obd->u.cli.cl_import);
2775         up_read(&obd->u.cli.cl_sem);
2776         if (!imp)
2777                 RETURN(-ENODEV);
2778
2779         /* We could possibly pass max_age in the request (as an absolute
2780          * timestamp or a "seconds.usec ago") so the target can avoid doing
2781          * extra calls into the filesystem if that isn't necessary (e.g.
2782          * during mount that would help a bit).  Having relative timestamps
2783          * is not so great if request processing is slow, while absolute
2784          * timestamps are not ideal because they need time synchronization. */
2785         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2786
2787         class_import_put(imp);
2788
2789         if (req == NULL)
2790                 RETURN(-ENOMEM);
2791
2792         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2793         if (rc) {
2794                 ptlrpc_request_free(req);
2795                 RETURN(rc);
2796         }
2797         ptlrpc_request_set_replen(req);
2798         req->rq_request_portal = OST_CREATE_PORTAL;
2799         ptlrpc_at_set_req_timeout(req);
2800
2801         if (flags & OBD_STATFS_NODELAY) {
2802                 /* procfs requests not want stat in wait for avoid deadlock */
2803                 req->rq_no_resend = 1;
2804                 req->rq_no_delay = 1;
2805         }
2806
2807         rc = ptlrpc_queue_wait(req);
2808         if (rc)
2809                 GOTO(out, rc);
2810
2811         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2812         if (msfs == NULL)
2813                 GOTO(out, rc = -EPROTO);
2814
2815         *osfs = *msfs;
2816
2817         EXIT;
2818 out:
2819         ptlrpc_req_finished(req);
2820         return rc;
2821 }
2822
2823 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2824                          void *karg, void __user *uarg)
2825 {
2826         struct obd_device *obd = exp->exp_obd;
2827         struct obd_ioctl_data *data = karg;
2828         int rc = 0;
2829
2830         ENTRY;
2831         if (!try_module_get(THIS_MODULE)) {
2832                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2833                        module_name(THIS_MODULE));
2834                 return -EINVAL;
2835         }
2836         switch (cmd) {
2837         case OBD_IOC_CLIENT_RECOVER:
2838                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2839                                            data->ioc_inlbuf1, 0);
2840                 if (rc > 0)
2841                         rc = 0;
2842                 break;
2843         case IOC_OSC_SET_ACTIVE:
2844                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2845                                               data->ioc_offset);
2846                 break;
2847         default:
2848                 rc = -ENOTTY;
2849                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2850                        obd->obd_name, cmd, current->comm, rc);
2851                 break;
2852         }
2853
2854         module_put(THIS_MODULE);
2855         return rc;
2856 }
2857
2858 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2859                        u32 keylen, void *key, u32 vallen, void *val,
2860                        struct ptlrpc_request_set *set)
2861 {
2862         struct ptlrpc_request *req;
2863         struct obd_device     *obd = exp->exp_obd;
2864         struct obd_import     *imp = class_exp2cliimp(exp);
2865         char                  *tmp;
2866         int                    rc;
2867         ENTRY;
2868
2869         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2870
2871         if (KEY_IS(KEY_CHECKSUM)) {
2872                 if (vallen != sizeof(int))
2873                         RETURN(-EINVAL);
2874                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2875                 RETURN(0);
2876         }
2877
2878         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2879                 sptlrpc_conf_client_adapt(obd);
2880                 RETURN(0);
2881         }
2882
2883         if (KEY_IS(KEY_FLUSH_CTX)) {
2884                 sptlrpc_import_flush_my_ctx(imp);
2885                 RETURN(0);
2886         }
2887
2888         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2889                 struct client_obd *cli = &obd->u.cli;
2890                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2891                 long target = *(long *)val;
2892
2893                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2894                 *(long *)val -= nr;
2895                 RETURN(0);
2896         }
2897
2898         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2899                 RETURN(-EINVAL);
2900
2901         /* We pass all other commands directly to OST. Since nobody calls osc
2902            methods directly and everybody is supposed to go through LOV, we
2903            assume lov checked invalid values for us.
2904            The only recognised values so far are evict_by_nid and mds_conn.
2905            Even if something bad goes through, we'd get a -EINVAL from OST
2906            anyway. */
2907
2908         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2909                                                 &RQF_OST_SET_GRANT_INFO :
2910                                                 &RQF_OBD_SET_INFO);
2911         if (req == NULL)
2912                 RETURN(-ENOMEM);
2913
2914         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2915                              RCL_CLIENT, keylen);
2916         if (!KEY_IS(KEY_GRANT_SHRINK))
2917                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2918                                      RCL_CLIENT, vallen);
2919         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2920         if (rc) {
2921                 ptlrpc_request_free(req);
2922                 RETURN(rc);
2923         }
2924
2925         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2926         memcpy(tmp, key, keylen);
2927         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2928                                                         &RMF_OST_BODY :
2929                                                         &RMF_SETINFO_VAL);
2930         memcpy(tmp, val, vallen);
2931
2932         if (KEY_IS(KEY_GRANT_SHRINK)) {
2933                 struct osc_grant_args *aa;
2934                 struct obdo *oa;
2935
2936                 aa = ptlrpc_req_async_args(aa, req);
2937                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2938                 if (!oa) {
2939                         ptlrpc_req_finished(req);
2940                         RETURN(-ENOMEM);
2941                 }
2942                 *oa = ((struct ost_body *)val)->oa;
2943                 aa->aa_oa = oa;
2944                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2945         }
2946
2947         ptlrpc_request_set_replen(req);
2948         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2949                 LASSERT(set != NULL);
2950                 ptlrpc_set_add_req(set, req);
2951                 ptlrpc_check_set(NULL, set);
2952         } else {
2953                 ptlrpcd_add_req(req);
2954         }
2955
2956         RETURN(0);
2957 }
2958 EXPORT_SYMBOL(osc_set_info_async);
2959
2960 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2961                   struct obd_device *obd, struct obd_uuid *cluuid,
2962                   struct obd_connect_data *data, void *localdata)
2963 {
2964         struct client_obd *cli = &obd->u.cli;
2965
2966         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2967                 long lost_grant;
2968                 long grant;
2969
2970                 spin_lock(&cli->cl_loi_list_lock);
2971                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2972                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2973                         /* restore ocd_grant_blkbits as client page bits */
2974                         data->ocd_grant_blkbits = PAGE_SHIFT;
2975                         grant += cli->cl_dirty_grant;
2976                 } else {
2977                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2978                 }
2979                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2980                 lost_grant = cli->cl_lost_grant;
2981                 cli->cl_lost_grant = 0;
2982                 spin_unlock(&cli->cl_loi_list_lock);
2983
2984                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2985                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2986                        data->ocd_version, data->ocd_grant, lost_grant);
2987         }
2988
2989         RETURN(0);
2990 }
2991 EXPORT_SYMBOL(osc_reconnect);
2992
2993 int osc_disconnect(struct obd_export *exp)
2994 {
2995         struct obd_device *obd = class_exp2obd(exp);
2996         int rc;
2997
2998         rc = client_disconnect_export(exp);
2999         /**
3000          * Initially we put del_shrink_grant before disconnect_export, but it
3001          * causes the following problem if setup (connect) and cleanup
3002          * (disconnect) are tangled together.
3003          *      connect p1                     disconnect p2
3004          *   ptlrpc_connect_import
3005          *     ...............               class_manual_cleanup
3006          *                                     osc_disconnect
3007          *                                     del_shrink_grant
3008          *   ptlrpc_connect_interrupt
3009          *     osc_init_grant
3010          *   add this client to shrink list
3011          *                                      cleanup_osc
3012          * Bang! grant shrink thread trigger the shrink. BUG18662
3013          */
3014         osc_del_grant_list(&obd->u.cli);
3015         return rc;
3016 }
3017 EXPORT_SYMBOL(osc_disconnect);
3018
3019 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3020                                  struct hlist_node *hnode, void *arg)
3021 {
3022         struct lu_env *env = arg;
3023         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3024         struct ldlm_lock *lock;
3025         struct osc_object *osc = NULL;
3026         ENTRY;
3027
3028         lock_res(res);
3029         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3030                 if (lock->l_ast_data != NULL && osc == NULL) {
3031                         osc = lock->l_ast_data;
3032                         cl_object_get(osc2cl(osc));
3033                 }
3034
3035                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3036                  * by the 2nd round of ldlm_namespace_clean() call in
3037                  * osc_import_event(). */
3038                 ldlm_clear_cleaned(lock);
3039         }
3040         unlock_res(res);
3041
3042         if (osc != NULL) {
3043                 osc_object_invalidate(env, osc);
3044                 cl_object_put(env, osc2cl(osc));
3045         }
3046
3047         RETURN(0);
3048 }
3049 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3050
3051 static int osc_import_event(struct obd_device *obd,
3052                             struct obd_import *imp,
3053                             enum obd_import_event event)
3054 {
3055         struct client_obd *cli;
3056         int rc = 0;
3057
3058         ENTRY;
3059         LASSERT(imp->imp_obd == obd);
3060
3061         switch (event) {
3062         case IMP_EVENT_DISCON: {
3063                 cli = &obd->u.cli;
3064                 spin_lock(&cli->cl_loi_list_lock);
3065                 cli->cl_avail_grant = 0;
3066                 cli->cl_lost_grant = 0;
3067                 spin_unlock(&cli->cl_loi_list_lock);
3068                 break;
3069         }
3070         case IMP_EVENT_INACTIVE: {
3071                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3072                 break;
3073         }
3074         case IMP_EVENT_INVALIDATE: {
3075                 struct ldlm_namespace *ns = obd->obd_namespace;
3076                 struct lu_env         *env;
3077                 __u16                  refcheck;
3078
3079                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3080
3081                 env = cl_env_get(&refcheck);
3082                 if (!IS_ERR(env)) {
3083                         osc_io_unplug(env, &obd->u.cli, NULL);
3084
3085                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3086                                                  osc_ldlm_resource_invalidate,
3087                                                  env, 0);
3088                         cl_env_put(env, &refcheck);
3089
3090                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3091                 } else
3092                         rc = PTR_ERR(env);
3093                 break;
3094         }
3095         case IMP_EVENT_ACTIVE: {
3096                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3097                 break;
3098         }
3099         case IMP_EVENT_OCD: {
3100                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3101
3102                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3103                         osc_init_grant(&obd->u.cli, ocd);
3104
3105                 /* See bug 7198 */
3106                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3107                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3108
3109                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3110                 break;
3111         }
3112         case IMP_EVENT_DEACTIVATE: {
3113                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3114                 break;
3115         }
3116         case IMP_EVENT_ACTIVATE: {
3117                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3118                 break;
3119         }
3120         default:
3121                 CERROR("Unknown import event %d\n", event);
3122                 LBUG();
3123         }
3124         RETURN(rc);
3125 }
3126
3127 /**
3128  * Determine whether the lock can be canceled before replaying the lock
3129  * during recovery, see bug16774 for detailed information.
3130  *
3131  * \retval zero the lock can't be canceled
3132  * \retval other ok to cancel
3133  */
3134 static int osc_cancel_weight(struct ldlm_lock *lock)
3135 {
3136         /*
3137          * Cancel all unused and granted extent lock.
3138          */
3139         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3140             ldlm_is_granted(lock) &&
3141             osc_ldlm_weigh_ast(lock) == 0)
3142                 RETURN(1);
3143
3144         RETURN(0);
3145 }
3146
3147 static int brw_queue_work(const struct lu_env *env, void *data)
3148 {
3149         struct client_obd *cli = data;
3150
3151         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3152
3153         osc_io_unplug(env, cli, NULL);
3154         RETURN(0);
3155 }
3156
3157 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3158 {
3159         struct client_obd *cli = &obd->u.cli;
3160         void *handler;
3161         int rc;
3162
3163         ENTRY;
3164
3165         rc = ptlrpcd_addref();
3166         if (rc)
3167                 RETURN(rc);
3168
3169         rc = client_obd_setup(obd, lcfg);
3170         if (rc)
3171                 GOTO(out_ptlrpcd, rc);
3172
3173
3174         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3175         if (IS_ERR(handler))
3176                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3177         cli->cl_writeback_work = handler;
3178
3179         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3180         if (IS_ERR(handler))
3181                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3182         cli->cl_lru_work = handler;
3183
3184         rc = osc_quota_setup(obd);
3185         if (rc)
3186                 GOTO(out_ptlrpcd_work, rc);
3187
3188         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3189         osc_update_next_shrink(cli);
3190
3191         RETURN(rc);
3192
3193 out_ptlrpcd_work:
3194         if (cli->cl_writeback_work != NULL) {
3195                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3196                 cli->cl_writeback_work = NULL;
3197         }
3198         if (cli->cl_lru_work != NULL) {
3199                 ptlrpcd_destroy_work(cli->cl_lru_work);
3200                 cli->cl_lru_work = NULL;
3201         }
3202         client_obd_cleanup(obd);
3203 out_ptlrpcd:
3204         ptlrpcd_decref();
3205         RETURN(rc);
3206 }
3207 EXPORT_SYMBOL(osc_setup_common);
3208
3209 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3210 {
3211         struct client_obd *cli = &obd->u.cli;
3212         int                adding;
3213         int                added;
3214         int                req_count;
3215         int                rc;
3216
3217         ENTRY;
3218
3219         rc = osc_setup_common(obd, lcfg);
3220         if (rc < 0)
3221                 RETURN(rc);
3222
3223         rc = osc_tunables_init(obd);
3224         if (rc)
3225                 RETURN(rc);
3226
3227         /*
3228          * We try to control the total number of requests with a upper limit
3229          * osc_reqpool_maxreqcount. There might be some race which will cause
3230          * over-limit allocation, but it is fine.
3231          */
3232         req_count = atomic_read(&osc_pool_req_count);
3233         if (req_count < osc_reqpool_maxreqcount) {
3234                 adding = cli->cl_max_rpcs_in_flight + 2;
3235                 if (req_count + adding > osc_reqpool_maxreqcount)
3236                         adding = osc_reqpool_maxreqcount - req_count;
3237
3238                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3239                 atomic_add(added, &osc_pool_req_count);
3240         }
3241
3242         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3243
3244         spin_lock(&osc_shrink_lock);
3245         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3246         spin_unlock(&osc_shrink_lock);
3247         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3248         cli->cl_import->imp_idle_debug = D_HA;
3249
3250         RETURN(0);
3251 }
3252
3253 int osc_precleanup_common(struct obd_device *obd)
3254 {
3255         struct client_obd *cli = &obd->u.cli;
3256         ENTRY;
3257
3258         /* LU-464
3259          * for echo client, export may be on zombie list, wait for
3260          * zombie thread to cull it, because cli.cl_import will be
3261          * cleared in client_disconnect_export():
3262          *   class_export_destroy() -> obd_cleanup() ->
3263          *   echo_device_free() -> echo_client_cleanup() ->
3264          *   obd_disconnect() -> osc_disconnect() ->
3265          *   client_disconnect_export()
3266          */
3267         obd_zombie_barrier();
3268         if (cli->cl_writeback_work) {
3269                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3270                 cli->cl_writeback_work = NULL;
3271         }
3272
3273         if (cli->cl_lru_work) {
3274                 ptlrpcd_destroy_work(cli->cl_lru_work);
3275                 cli->cl_lru_work = NULL;
3276         }
3277
3278         obd_cleanup_client_import(obd);
3279         RETURN(0);
3280 }
3281 EXPORT_SYMBOL(osc_precleanup_common);
3282
3283 static int osc_precleanup(struct obd_device *obd)
3284 {
3285         ENTRY;
3286
3287         osc_precleanup_common(obd);
3288
3289         ptlrpc_lprocfs_unregister_obd(obd);
3290         RETURN(0);
3291 }
3292
3293 int osc_cleanup_common(struct obd_device *obd)
3294 {
3295         struct client_obd *cli = &obd->u.cli;
3296         int rc;
3297
3298         ENTRY;
3299
3300         spin_lock(&osc_shrink_lock);
3301         list_del(&cli->cl_shrink_list);
3302         spin_unlock(&osc_shrink_lock);
3303
3304         /* lru cleanup */
3305         if (cli->cl_cache != NULL) {
3306                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3307                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3308                 list_del_init(&cli->cl_lru_osc);
3309                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3310                 cli->cl_lru_left = NULL;
3311                 cl_cache_decref(cli->cl_cache);
3312                 cli->cl_cache = NULL;
3313         }
3314
3315         /* free memory of osc quota cache */
3316         osc_quota_cleanup(obd);
3317
3318         rc = client_obd_cleanup(obd);
3319
3320         ptlrpcd_decref();
3321         RETURN(rc);
3322 }
3323 EXPORT_SYMBOL(osc_cleanup_common);
3324
3325 static const struct obd_ops osc_obd_ops = {
3326         .o_owner                = THIS_MODULE,
3327         .o_setup                = osc_setup,
3328         .o_precleanup           = osc_precleanup,
3329         .o_cleanup              = osc_cleanup_common,
3330         .o_add_conn             = client_import_add_conn,
3331         .o_del_conn             = client_import_del_conn,
3332         .o_connect              = client_connect_import,
3333         .o_reconnect            = osc_reconnect,
3334         .o_disconnect           = osc_disconnect,
3335         .o_statfs               = osc_statfs,
3336         .o_statfs_async         = osc_statfs_async,
3337         .o_create               = osc_create,
3338         .o_destroy              = osc_destroy,
3339         .o_getattr              = osc_getattr,
3340         .o_setattr              = osc_setattr,
3341         .o_iocontrol            = osc_iocontrol,
3342         .o_set_info_async       = osc_set_info_async,
3343         .o_import_event         = osc_import_event,
3344         .o_quotactl             = osc_quotactl,
3345 };
3346
3347 static struct shrinker *osc_cache_shrinker;
3348 LIST_HEAD(osc_shrink_list);
3349 DEFINE_SPINLOCK(osc_shrink_lock);
3350
3351 #ifndef HAVE_SHRINKER_COUNT
3352 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3353 {
3354         struct shrink_control scv = {
3355                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3356                 .gfp_mask   = shrink_param(sc, gfp_mask)
3357         };
3358         (void)osc_cache_shrink_scan(shrinker, &scv);
3359
3360         return osc_cache_shrink_count(shrinker, &scv);
3361 }
3362 #endif
3363
3364 static int __init osc_init(void)
3365 {
3366         unsigned int reqpool_size;
3367         unsigned int reqsize;
3368         int rc;
3369         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3370                          osc_cache_shrink_count, osc_cache_shrink_scan);
3371         ENTRY;
3372
3373         /* print an address of _any_ initialized kernel symbol from this
3374          * module, to allow debugging with gdb that doesn't support data
3375          * symbols from modules.*/
3376         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3377
3378         rc = lu_kmem_init(osc_caches);
3379         if (rc)
3380                 RETURN(rc);
3381
3382         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3383                                  LUSTRE_OSC_NAME, &osc_device_type);
3384         if (rc)
3385                 GOTO(out_kmem, rc);
3386
3387         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3388
3389         /* This is obviously too much memory, only prevent overflow here */
3390         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3391                 GOTO(out_type, rc = -EINVAL);
3392
3393         reqpool_size = osc_reqpool_mem_max << 20;
3394
3395         reqsize = 1;
3396         while (reqsize < OST_IO_MAXREQSIZE)
3397                 reqsize = reqsize << 1;
3398
3399         /*
3400          * We don't enlarge the request count in OSC pool according to
3401          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3402          * tried after normal allocation failed. So a small OSC pool won't
3403          * cause much performance degression in most of cases.
3404          */
3405         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3406
3407         atomic_set(&osc_pool_req_count, 0);
3408         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3409                                           ptlrpc_add_rqs_to_pool);
3410
3411         if (osc_rq_pool == NULL)
3412                 GOTO(out_type, rc = -ENOMEM);
3413
3414         rc = osc_start_grant_work();
3415         if (rc != 0)
3416                 GOTO(out_req_pool, rc);
3417
3418         RETURN(rc);
3419
3420 out_req_pool:
3421         ptlrpc_free_rq_pool(osc_rq_pool);
3422 out_type:
3423         class_unregister_type(LUSTRE_OSC_NAME);
3424 out_kmem:
3425         lu_kmem_fini(osc_caches);
3426
3427         RETURN(rc);
3428 }
3429
3430 static void __exit osc_exit(void)
3431 {
3432         osc_stop_grant_work();
3433         remove_shrinker(osc_cache_shrinker);
3434         class_unregister_type(LUSTRE_OSC_NAME);
3435         lu_kmem_fini(osc_caches);
3436         ptlrpc_free_rq_pool(osc_rq_pool);
3437 }
3438
3439 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3440 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3441 MODULE_VERSION(LUSTRE_VERSION_STRING);
3442 MODULE_LICENSE("GPL");
3443
3444 module_init(osc_init);
3445 module_exit(osc_exit);