Whamcloud - gitweb
f0660f4ca9bcf2e89944006a1d578d983ba877ce
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <uapi/linux/lustre/lustre_param.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49
50 #include "osc_internal.h"
51
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
55
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
59
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
62
63 #define osc_grant_args osc_brw_async_args
64
65 struct osc_setattr_args {
66         struct obdo             *sa_oa;
67         obd_enqueue_update_f     sa_upcall;
68         void                    *sa_cookie;
69 };
70
71 struct osc_fsync_args {
72         struct osc_object       *fa_obj;
73         struct obdo             *fa_oa;
74         obd_enqueue_update_f    fa_upcall;
75         void                    *fa_cookie;
76 };
77
78 struct osc_ladvise_args {
79         struct obdo             *la_oa;
80         obd_enqueue_update_f     la_upcall;
81         void                    *la_cookie;
82 };
83
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
86                          void *data, int rc);
87
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 {
90         struct ost_body *body;
91
92         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
93         LASSERT(body);
94
95         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
96 }
97
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
99                        struct obdo *oa)
100 {
101         struct ptlrpc_request   *req;
102         struct ost_body         *body;
103         int                      rc;
104
105         ENTRY;
106         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
107         if (req == NULL)
108                 RETURN(-ENOMEM);
109
110         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111         if (rc) {
112                 ptlrpc_request_free(req);
113                 RETURN(rc);
114         }
115
116         osc_pack_req_body(req, oa);
117
118         ptlrpc_request_set_replen(req);
119
120         rc = ptlrpc_queue_wait(req);
121         if (rc)
122                 GOTO(out, rc);
123
124         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125         if (body == NULL)
126                 GOTO(out, rc = -EPROTO);
127
128         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130
131         oa->o_blksize = cli_brw_size(exp->exp_obd);
132         oa->o_valid |= OBD_MD_FLBLKSZ;
133
134         EXIT;
135 out:
136         ptlrpc_req_finished(req);
137
138         return rc;
139 }
140
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
142                        struct obdo *oa)
143 {
144         struct ptlrpc_request   *req;
145         struct ost_body         *body;
146         int                      rc;
147
148         ENTRY;
149         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150
151         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
152         if (req == NULL)
153                 RETURN(-ENOMEM);
154
155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156         if (rc) {
157                 ptlrpc_request_free(req);
158                 RETURN(rc);
159         }
160
161         osc_pack_req_body(req, oa);
162
163         ptlrpc_request_set_replen(req);
164
165         rc = ptlrpc_queue_wait(req);
166         if (rc)
167                 GOTO(out, rc);
168
169         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170         if (body == NULL)
171                 GOTO(out, rc = -EPROTO);
172
173         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
174
175         EXIT;
176 out:
177         ptlrpc_req_finished(req);
178
179         RETURN(rc);
180 }
181
182 static int osc_setattr_interpret(const struct lu_env *env,
183                                  struct ptlrpc_request *req,
184                                  struct osc_setattr_args *sa, int rc)
185 {
186         struct ost_body *body;
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply =
233                         (ptlrpc_interpterer_t)osc_setattr_interpret;
234
235                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
236                 sa = ptlrpc_req_async_args(req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 if (rqset == PTLRPCD_SET)
242                         ptlrpcd_add_req(req);
243                 else
244                         ptlrpc_set_add_req(rqset, req);
245         }
246
247         RETURN(0);
248 }
249
250 static int osc_ladvise_interpret(const struct lu_env *env,
251                                  struct ptlrpc_request *req,
252                                  void *arg, int rc)
253 {
254         struct osc_ladvise_args *la = arg;
255         struct ost_body *body;
256         ENTRY;
257
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262         if (body == NULL)
263                 GOTO(out, rc = -EPROTO);
264
265         *la->la_oa = body->oa;
266 out:
267         rc = la->la_upcall(la->la_cookie, rc);
268         RETURN(rc);
269 }
270
271 /**
272  * If rqset is NULL, do not wait for response. Upcall and cookie could also
273  * be NULL in this case
274  */
275 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
276                      struct ladvise_hdr *ladvise_hdr,
277                      obd_enqueue_update_f upcall, void *cookie,
278                      struct ptlrpc_request_set *rqset)
279 {
280         struct ptlrpc_request   *req;
281         struct ost_body         *body;
282         struct osc_ladvise_args *la;
283         int                      rc;
284         struct lu_ladvise       *req_ladvise;
285         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
286         int                      num_advise = ladvise_hdr->lah_count;
287         struct ladvise_hdr      *req_ladvise_hdr;
288         ENTRY;
289
290         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
291         if (req == NULL)
292                 RETURN(-ENOMEM);
293
294         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
295                              num_advise * sizeof(*ladvise));
296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297         if (rc != 0) {
298                 ptlrpc_request_free(req);
299                 RETURN(rc);
300         }
301         req->rq_request_portal = OST_IO_PORTAL;
302         ptlrpc_at_set_req_timeout(req);
303
304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305         LASSERT(body);
306         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
307                              oa);
308
309         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
310                                                  &RMF_OST_LADVISE_HDR);
311         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312
313         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
314         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
315         ptlrpc_request_set_replen(req);
316
317         if (rqset == NULL) {
318                 /* Do not wait for response. */
319                 ptlrpcd_add_req(req);
320                 RETURN(0);
321         }
322
323         req->rq_interpret_reply = osc_ladvise_interpret;
324         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
325         la = ptlrpc_req_async_args(req);
326         la->la_oa = oa;
327         la->la_upcall = upcall;
328         la->la_cookie = cookie;
329
330         if (rqset == PTLRPCD_SET)
331                 ptlrpcd_add_req(req);
332         else
333                 ptlrpc_set_add_req(rqset, req);
334
335         RETURN(0);
336 }
337
338 static int osc_create(const struct lu_env *env, struct obd_export *exp,
339                       struct obdo *oa)
340 {
341         struct ptlrpc_request *req;
342         struct ost_body       *body;
343         int                    rc;
344         ENTRY;
345
346         LASSERT(oa != NULL);
347         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
348         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349
350         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351         if (req == NULL)
352                 GOTO(out, rc = -ENOMEM);
353
354         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355         if (rc) {
356                 ptlrpc_request_free(req);
357                 GOTO(out, rc);
358         }
359
360         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
361         LASSERT(body);
362
363         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364
365         ptlrpc_request_set_replen(req);
366
367         rc = ptlrpc_queue_wait(req);
368         if (rc)
369                 GOTO(out_req, rc);
370
371         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372         if (body == NULL)
373                 GOTO(out_req, rc = -EPROTO);
374
375         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
376         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377
378         oa->o_blksize = cli_brw_size(exp->exp_obd);
379         oa->o_valid |= OBD_MD_FLBLKSZ;
380
381         CDEBUG(D_HA, "transno: %lld\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
390                    obd_enqueue_update_f upcall, void *cookie)
391 {
392         struct ptlrpc_request *req;
393         struct osc_setattr_args *sa;
394         struct obd_import *imp = class_exp2cliimp(exp);
395         struct ost_body *body;
396         int rc;
397
398         ENTRY;
399
400         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
401         if (req == NULL)
402                 RETURN(-ENOMEM);
403
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc < 0) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409
410         osc_set_io_portal(req);
411
412         ptlrpc_at_set_req_timeout(req);
413
414         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415
416         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa = oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426
427         ptlrpcd_add_req(req);
428
429         RETURN(0);
430 }
431 EXPORT_SYMBOL(osc_punch_send);
432
433 static int osc_sync_interpret(const struct lu_env *env,
434                               struct ptlrpc_request *req,
435                               void *arg, int rc)
436 {
437         struct osc_fsync_args   *fa = arg;
438         struct ost_body         *body;
439         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
440         unsigned long           valid = 0;
441         struct cl_object        *obj;
442         ENTRY;
443
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
448         if (body == NULL) {
449                 CERROR("can't unpack ost_body\n");
450                 GOTO(out, rc = -EPROTO);
451         }
452
453         *fa->fa_oa = body->oa;
454         obj = osc2cl(fa->fa_obj);
455
456         /* Update osc object's blocks attribute */
457         cl_object_attr_lock(obj);
458         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
459                 attr->cat_blocks = body->oa.o_blocks;
460                 valid |= CAT_BLOCKS;
461         }
462
463         if (valid != 0)
464                 cl_object_attr_update(env, obj, attr, valid);
465         cl_object_attr_unlock(obj);
466
467 out:
468         rc = fa->fa_upcall(fa->fa_cookie, rc);
469         RETURN(rc);
470 }
471
472 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
473                   obd_enqueue_update_f upcall, void *cookie,
474                   struct ptlrpc_request_set *rqset)
475 {
476         struct obd_export     *exp = osc_export(obj);
477         struct ptlrpc_request *req;
478         struct ost_body       *body;
479         struct osc_fsync_args *fa;
480         int                    rc;
481         ENTRY;
482
483         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
484         if (req == NULL)
485                 RETURN(-ENOMEM);
486
487         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
488         if (rc) {
489                 ptlrpc_request_free(req);
490                 RETURN(rc);
491         }
492
493         /* overload the size and blocks fields in the oa with start/end */
494         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
495         LASSERT(body);
496         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
497
498         ptlrpc_request_set_replen(req);
499         req->rq_interpret_reply = osc_sync_interpret;
500
501         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
502         fa = ptlrpc_req_async_args(req);
503         fa->fa_obj = obj;
504         fa->fa_oa = oa;
505         fa->fa_upcall = upcall;
506         fa->fa_cookie = cookie;
507
508         if (rqset == PTLRPCD_SET)
509                 ptlrpcd_add_req(req);
510         else
511                 ptlrpc_set_add_req(rqset, req);
512
513         RETURN (0);
514 }
515
516 /* Find and cancel locally locks matched by @mode in the resource found by
517  * @objid. Found locks are added into @cancel list. Returns the amount of
518  * locks added to @cancels list. */
519 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
520                                    struct list_head *cancels,
521                                    enum ldlm_mode mode, __u64 lock_flags)
522 {
523         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
524         struct ldlm_res_id res_id;
525         struct ldlm_resource *res;
526         int count;
527         ENTRY;
528
529         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
530          * export) but disabled through procfs (flag in NS).
531          *
532          * This distinguishes from a case when ELC is not supported originally,
533          * when we still want to cancel locks in advance and just cancel them
534          * locally, without sending any RPC. */
535         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536                 RETURN(0);
537
538         ostid_build_res_name(&oa->o_oi, &res_id);
539         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
540         if (IS_ERR(res))
541                 RETURN(0);
542
543         LDLM_RESOURCE_ADDREF(res);
544         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
545                                            lock_flags, 0, NULL);
546         LDLM_RESOURCE_DELREF(res);
547         ldlm_resource_putref(res);
548         RETURN(count);
549 }
550
551 static int osc_destroy_interpret(const struct lu_env *env,
552                                  struct ptlrpc_request *req, void *data,
553                                  int rc)
554 {
555         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
556
557         atomic_dec(&cli->cl_destroy_in_flight);
558         wake_up(&cli->cl_destroy_waitq);
559         return 0;
560 }
561
562 static int osc_can_send_destroy(struct client_obd *cli)
563 {
564         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
565             cli->cl_max_rpcs_in_flight) {
566                 /* The destroy request can be sent */
567                 return 1;
568         }
569         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
570             cli->cl_max_rpcs_in_flight) {
571                 /*
572                  * The counter has been modified between the two atomic
573                  * operations.
574                  */
575                 wake_up(&cli->cl_destroy_waitq);
576         }
577         return 0;
578 }
579
580 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581                        struct obdo *oa)
582 {
583         struct client_obd     *cli = &exp->exp_obd->u.cli;
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         struct list_head       cancels = LIST_HEAD_INIT(cancels);
587         int rc, count;
588         ENTRY;
589
590         if (!oa) {
591                 CDEBUG(D_INFO, "oa NULL\n");
592                 RETURN(-EINVAL);
593         }
594
595         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
596                                         LDLM_FL_DISCARD_DATA);
597
598         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
599         if (req == NULL) {
600                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
601                 RETURN(-ENOMEM);
602         }
603
604         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605                                0, &cancels, count);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
612         ptlrpc_at_set_req_timeout(req);
613
614         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615         LASSERT(body);
616         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
617
618         ptlrpc_request_set_replen(req);
619
620         req->rq_interpret_reply = osc_destroy_interpret;
621         if (!osc_can_send_destroy(cli)) {
622                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623
624                 /*
625                  * Wait until the number of on-going destroy RPCs drops
626                  * under max_rpc_in_flight
627                  */
628                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
629                                             osc_can_send_destroy(cli), &lwi);
630                 if (rc) {
631                         ptlrpc_req_finished(req);
632                         RETURN(rc);
633                 }
634         }
635
636         /* Do not wait for response */
637         ptlrpcd_add_req(req);
638         RETURN(0);
639 }
640
641 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642                                 long writing_bytes)
643 {
644         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
645
646         LASSERT(!(oa->o_valid & bits));
647
648         oa->o_valid |= bits;
649         spin_lock(&cli->cl_loi_list_lock);
650         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
651                 oa->o_dirty = cli->cl_dirty_grant;
652         else
653                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
654         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
655                      cli->cl_dirty_max_pages)) {
656                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
657                        cli->cl_dirty_pages, cli->cl_dirty_transit,
658                        cli->cl_dirty_max_pages);
659                 oa->o_undirty = 0;
660         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
661                             atomic_long_read(&obd_dirty_transit_pages) >
662                             (long)(obd_max_dirty_pages + 1))) {
663                 /* The atomic_read() allowing the atomic_inc() are
664                  * not covered by a lock thus they may safely race and trip
665                  * this CERROR() unless we add in a small fudge factor (+1). */
666                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
667                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
668                        atomic_long_read(&obd_dirty_transit_pages),
669                        obd_max_dirty_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
672                             0x7fffffff)) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675                 oa->o_undirty = 0;
676         } else {
677                 unsigned long nrpages;
678                 unsigned long undirty;
679
680                 nrpages = cli->cl_max_pages_per_rpc;
681                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
682                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
683                 undirty = nrpages << PAGE_SHIFT;
684                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
685                                  GRANT_PARAM)) {
686                         int nrextents;
687
688                         /* take extent tax into account when asking for more
689                          * grant space */
690                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
691                                      cli->cl_max_extent_pages;
692                         undirty += nrextents * cli->cl_grant_extent_tax;
693                 }
694                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
695                  * to add extent tax, etc.
696                  */
697                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
698                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
699         }
700         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
701         oa->o_dropped = cli->cl_lost_grant;
702         cli->cl_lost_grant = 0;
703         spin_unlock(&cli->cl_loi_list_lock);
704         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
705                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 }
707
708 void osc_update_next_shrink(struct client_obd *cli)
709 {
710         cli->cl_next_shrink_grant = ktime_get_seconds() +
711                                     cli->cl_grant_shrink_interval;
712
713         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
714                cli->cl_next_shrink_grant);
715 }
716
717 static void __osc_update_grant(struct client_obd *cli, u64 grant)
718 {
719         spin_lock(&cli->cl_loi_list_lock);
720         cli->cl_avail_grant += grant;
721         spin_unlock(&cli->cl_loi_list_lock);
722 }
723
724 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
725 {
726         if (body->oa.o_valid & OBD_MD_FLGRANT) {
727                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
728                 __osc_update_grant(cli, body->oa.o_grant);
729         }
730 }
731
732 /**
733  * grant thread data for shrinking space.
734  */
735 struct grant_thread_data {
736         struct list_head        gtd_clients;
737         struct mutex            gtd_mutex;
738         unsigned long           gtd_stopped:1;
739 };
740 static struct grant_thread_data client_gtd;
741
742 static int osc_shrink_grant_interpret(const struct lu_env *env,
743                                       struct ptlrpc_request *req,
744                                       void *aa, int rc)
745 {
746         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
747         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
748         struct ost_body *body;
749
750         if (rc != 0) {
751                 __osc_update_grant(cli, oa->o_grant);
752                 GOTO(out, rc);
753         }
754
755         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
756         LASSERT(body);
757         osc_update_grant(cli, body);
758 out:
759         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
760         oa = NULL;
761         return rc;
762 }
763
764 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
765 {
766         spin_lock(&cli->cl_loi_list_lock);
767         oa->o_grant = cli->cl_avail_grant / 4;
768         cli->cl_avail_grant -= oa->o_grant;
769         spin_unlock(&cli->cl_loi_list_lock);
770         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
771                 oa->o_valid |= OBD_MD_FLFLAGS;
772                 oa->o_flags = 0;
773         }
774         oa->o_flags |= OBD_FL_SHRINK_GRANT;
775         osc_update_next_shrink(cli);
776 }
777
778 /* Shrink the current grant, either from some large amount to enough for a
779  * full set of in-flight RPCs, or if we have already shrunk to that limit
780  * then to enough for a single RPC.  This avoids keeping more grant than
781  * needed, and avoids shrinking the grant piecemeal. */
782 static int osc_shrink_grant(struct client_obd *cli)
783 {
784         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
785                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
786
787         spin_lock(&cli->cl_loi_list_lock);
788         if (cli->cl_avail_grant <= target_bytes)
789                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
790         spin_unlock(&cli->cl_loi_list_lock);
791
792         return osc_shrink_grant_to_target(cli, target_bytes);
793 }
794
795 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 {
797         int                     rc = 0;
798         struct ost_body        *body;
799         ENTRY;
800
801         spin_lock(&cli->cl_loi_list_lock);
802         /* Don't shrink if we are already above or below the desired limit
803          * We don't want to shrink below a single RPC, as that will negatively
804          * impact block allocation and long-term performance. */
805         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
806                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
807
808         if (target_bytes >= cli->cl_avail_grant) {
809                 spin_unlock(&cli->cl_loi_list_lock);
810                 RETURN(0);
811         }
812         spin_unlock(&cli->cl_loi_list_lock);
813
814         OBD_ALLOC_PTR(body);
815         if (!body)
816                 RETURN(-ENOMEM);
817
818         osc_announce_cached(cli, &body->oa, 0);
819
820         spin_lock(&cli->cl_loi_list_lock);
821         if (target_bytes >= cli->cl_avail_grant) {
822                 /* available grant has changed since target calculation */
823                 spin_unlock(&cli->cl_loi_list_lock);
824                 GOTO(out_free, rc = 0);
825         }
826         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
827         cli->cl_avail_grant = target_bytes;
828         spin_unlock(&cli->cl_loi_list_lock);
829         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
830                 body->oa.o_valid |= OBD_MD_FLFLAGS;
831                 body->oa.o_flags = 0;
832         }
833         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
834         osc_update_next_shrink(cli);
835
836         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
837                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
838                                 sizeof(*body), body, NULL);
839         if (rc != 0)
840                 __osc_update_grant(cli, body->oa.o_grant);
841 out_free:
842         OBD_FREE_PTR(body);
843         RETURN(rc);
844 }
845
846 static int osc_should_shrink_grant(struct client_obd *client)
847 {
848         time64_t next_shrink = client->cl_next_shrink_grant;
849
850         if (client->cl_import == NULL)
851                 return 0;
852
853         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
854             client->cl_import->imp_grant_shrink_disabled) {
855                 osc_update_next_shrink(client);
856                 return 0;
857         }
858
859         if (ktime_get_seconds() >= next_shrink - 5) {
860                 /* Get the current RPC size directly, instead of going via:
861                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
862                  * Keep comment here so that it can be found by searching. */
863                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
864
865                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
866                     client->cl_avail_grant > brw_size)
867                         return 1;
868                 else
869                         osc_update_next_shrink(client);
870         }
871         return 0;
872 }
873
874 #define GRANT_SHRINK_RPC_BATCH  100
875
876 static struct delayed_work work;
877
878 static void osc_grant_work_handler(struct work_struct *data)
879 {
880         struct client_obd *cli;
881         int rpc_sent;
882         bool init_next_shrink = true;
883         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
884
885         rpc_sent = 0;
886         mutex_lock(&client_gtd.gtd_mutex);
887         list_for_each_entry(cli, &client_gtd.gtd_clients,
888                             cl_grant_chain) {
889                 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
890                     osc_should_shrink_grant(cli))
891                         osc_shrink_grant(cli);
892
893                 if (!init_next_shrink) {
894                         if (cli->cl_next_shrink_grant < next_shrink &&
895                             cli->cl_next_shrink_grant > ktime_get_seconds())
896                                 next_shrink = cli->cl_next_shrink_grant;
897                 } else {
898                         init_next_shrink = false;
899                         next_shrink = cli->cl_next_shrink_grant;
900                 }
901         }
902         mutex_unlock(&client_gtd.gtd_mutex);
903
904         if (client_gtd.gtd_stopped == 1)
905                 return;
906
907         if (next_shrink > ktime_get_seconds())
908                 schedule_delayed_work(&work, msecs_to_jiffies(
909                                         (next_shrink - ktime_get_seconds()) *
910                                         MSEC_PER_SEC));
911         else
912                 schedule_work(&work.work);
913 }
914
915 /**
916  * Start grant thread for returing grant to server for idle clients.
917  */
918 static int osc_start_grant_work(void)
919 {
920         client_gtd.gtd_stopped = 0;
921         mutex_init(&client_gtd.gtd_mutex);
922         INIT_LIST_HEAD(&client_gtd.gtd_clients);
923
924         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
925         schedule_work(&work.work);
926
927         return 0;
928 }
929
930 static void osc_stop_grant_work(void)
931 {
932         client_gtd.gtd_stopped = 1;
933         cancel_delayed_work_sync(&work);
934 }
935
936 static void osc_add_grant_list(struct client_obd *client)
937 {
938         mutex_lock(&client_gtd.gtd_mutex);
939         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
940         mutex_unlock(&client_gtd.gtd_mutex);
941 }
942
943 static void osc_del_grant_list(struct client_obd *client)
944 {
945         if (list_empty(&client->cl_grant_chain))
946                 return;
947
948         mutex_lock(&client_gtd.gtd_mutex);
949         list_del_init(&client->cl_grant_chain);
950         mutex_unlock(&client_gtd.gtd_mutex);
951 }
952
953 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
954 {
955         /*
956          * ocd_grant is the total grant amount we're expect to hold: if we've
957          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
958          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
959          * dirty.
960          *
961          * race is tolerable here: if we're evicted, but imp_state already
962          * left EVICTED state, then cl_dirty_pages must be 0 already.
963          */
964         spin_lock(&cli->cl_loi_list_lock);
965         cli->cl_avail_grant = ocd->ocd_grant;
966         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
967                 unsigned long consumed = cli->cl_reserved_grant;
968
969                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
970                         consumed += cli->cl_dirty_grant;
971                 else
972                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
973                 if (cli->cl_avail_grant < consumed) {
974                         CERROR("%s: granted %ld but already consumed %ld\n",
975                                cli_name(cli), cli->cl_avail_grant, consumed);
976                         cli->cl_avail_grant = 0;
977                 } else {
978                         cli->cl_avail_grant -= consumed;
979                 }
980         }
981
982         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
983                 u64 size;
984                 int chunk_mask;
985
986                 /* overhead for each extent insertion */
987                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
988                 /* determine the appropriate chunk size used by osc_extent. */
989                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
990                                           ocd->ocd_grant_blkbits);
991                 /* max_pages_per_rpc must be chunk aligned */
992                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
993                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
994                                              ~chunk_mask) & chunk_mask;
995                 /* determine maximum extent size, in #pages */
996                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
997                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
998                 if (cli->cl_max_extent_pages == 0)
999                         cli->cl_max_extent_pages = 1;
1000         } else {
1001                 cli->cl_grant_extent_tax = 0;
1002                 cli->cl_chunkbits = PAGE_SHIFT;
1003                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1004         }
1005         spin_unlock(&cli->cl_loi_list_lock);
1006
1007         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1008                 "chunk bits: %d cl_max_extent_pages: %d\n",
1009                 cli_name(cli),
1010                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1011                 cli->cl_max_extent_pages);
1012
1013         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1014                 osc_add_grant_list(cli);
1015 }
1016 EXPORT_SYMBOL(osc_init_grant);
1017
1018 /* We assume that the reason this OSC got a short read is because it read
1019  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1020  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1021  * this stripe never got written at or beyond this stripe offset yet. */
1022 static void handle_short_read(int nob_read, size_t page_count,
1023                               struct brw_page **pga)
1024 {
1025         char *ptr;
1026         int i = 0;
1027
1028         /* skip bytes read OK */
1029         while (nob_read > 0) {
1030                 LASSERT (page_count > 0);
1031
1032                 if (pga[i]->count > nob_read) {
1033                         /* EOF inside this page */
1034                         ptr = kmap(pga[i]->pg) +
1035                                 (pga[i]->off & ~PAGE_MASK);
1036                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1037                         kunmap(pga[i]->pg);
1038                         page_count--;
1039                         i++;
1040                         break;
1041                 }
1042
1043                 nob_read -= pga[i]->count;
1044                 page_count--;
1045                 i++;
1046         }
1047
1048         /* zero remaining pages */
1049         while (page_count-- > 0) {
1050                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1051                 memset(ptr, 0, pga[i]->count);
1052                 kunmap(pga[i]->pg);
1053                 i++;
1054         }
1055 }
1056
1057 static int check_write_rcs(struct ptlrpc_request *req,
1058                            int requested_nob, int niocount,
1059                            size_t page_count, struct brw_page **pga)
1060 {
1061         int     i;
1062         __u32   *remote_rcs;
1063
1064         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1065                                                   sizeof(*remote_rcs) *
1066                                                   niocount);
1067         if (remote_rcs == NULL) {
1068                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1069                 return(-EPROTO);
1070         }
1071
1072         /* return error if any niobuf was in error */
1073         for (i = 0; i < niocount; i++) {
1074                 if ((int)remote_rcs[i] < 0)
1075                         return(remote_rcs[i]);
1076
1077                 if (remote_rcs[i] != 0) {
1078                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1079                                 i, remote_rcs[i], req);
1080                         return(-EPROTO);
1081                 }
1082         }
1083         if (req->rq_bulk != NULL &&
1084             req->rq_bulk->bd_nob_transferred != requested_nob) {
1085                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1086                        req->rq_bulk->bd_nob_transferred, requested_nob);
1087                 return(-EPROTO);
1088         }
1089
1090         return (0);
1091 }
1092
1093 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1094 {
1095         if (p1->flag != p2->flag) {
1096                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1097                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1098                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1099
1100                 /* warn if we try to combine flags that we don't know to be
1101                  * safe to combine */
1102                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1103                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1104                               "report this at https://jira.whamcloud.com/\n",
1105                               p1->flag, p2->flag);
1106                 }
1107                 return 0;
1108         }
1109
1110         return (p1->off + p1->count == p2->off);
1111 }
1112
1113 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1114 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1115                                    size_t pg_count, struct brw_page **pga,
1116                                    int opc, obd_dif_csum_fn *fn,
1117                                    int sector_size,
1118                                    u32 *check_sum)
1119 {
1120         struct ahash_request *req;
1121         /* Used Adler as the default checksum type on top of DIF tags */
1122         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1123         struct page *__page;
1124         unsigned char *buffer;
1125         __u16 *guard_start;
1126         unsigned int bufsize;
1127         int guard_number;
1128         int used_number = 0;
1129         int used;
1130         u32 cksum;
1131         int rc = 0;
1132         int i = 0;
1133
1134         LASSERT(pg_count > 0);
1135
1136         __page = alloc_page(GFP_KERNEL);
1137         if (__page == NULL)
1138                 return -ENOMEM;
1139
1140         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1141         if (IS_ERR(req)) {
1142                 rc = PTR_ERR(req);
1143                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1144                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1145                 GOTO(out, rc);
1146         }
1147
1148         buffer = kmap(__page);
1149         guard_start = (__u16 *)buffer;
1150         guard_number = PAGE_SIZE / sizeof(*guard_start);
1151         while (nob > 0 && pg_count > 0) {
1152                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1153
1154                 /* corrupt the data before we compute the checksum, to
1155                  * simulate an OST->client data error */
1156                 if (unlikely(i == 0 && opc == OST_READ &&
1157                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1158                         unsigned char *ptr = kmap(pga[i]->pg);
1159                         int off = pga[i]->off & ~PAGE_MASK;
1160
1161                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1162                         kunmap(pga[i]->pg);
1163                 }
1164
1165                 /*
1166                  * The left guard number should be able to hold checksums of a
1167                  * whole page
1168                  */
1169                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1170                                                   pga[i]->off & ~PAGE_MASK,
1171                                                   count,
1172                                                   guard_start + used_number,
1173                                                   guard_number - used_number,
1174                                                   &used, sector_size,
1175                                                   fn);
1176                 if (rc)
1177                         break;
1178
1179                 used_number += used;
1180                 if (used_number == guard_number) {
1181                         cfs_crypto_hash_update_page(req, __page, 0,
1182                                 used_number * sizeof(*guard_start));
1183                         used_number = 0;
1184                 }
1185
1186                 nob -= pga[i]->count;
1187                 pg_count--;
1188                 i++;
1189         }
1190         kunmap(__page);
1191         if (rc)
1192                 GOTO(out, rc);
1193
1194         if (used_number != 0)
1195                 cfs_crypto_hash_update_page(req, __page, 0,
1196                         used_number * sizeof(*guard_start));
1197
1198         bufsize = sizeof(cksum);
1199         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1200
1201         /* For sending we only compute the wrong checksum instead
1202          * of corrupting the data so it is still correct on a redo */
1203         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1204                 cksum++;
1205
1206         *check_sum = cksum;
1207 out:
1208         __free_page(__page);
1209         return rc;
1210 }
1211 #else /* !CONFIG_CRC_T10DIF */
1212 #define obd_dif_ip_fn NULL
1213 #define obd_dif_crc_fn NULL
1214 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1215         -EOPNOTSUPP
1216 #endif /* CONFIG_CRC_T10DIF */
1217
1218 static int osc_checksum_bulk(int nob, size_t pg_count,
1219                              struct brw_page **pga, int opc,
1220                              enum cksum_types cksum_type,
1221                              u32 *cksum)
1222 {
1223         int                             i = 0;
1224         struct ahash_request           *req;
1225         unsigned int                    bufsize;
1226         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1227
1228         LASSERT(pg_count > 0);
1229
1230         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1231         if (IS_ERR(req)) {
1232                 CERROR("Unable to initialize checksum hash %s\n",
1233                        cfs_crypto_hash_name(cfs_alg));
1234                 return PTR_ERR(req);
1235         }
1236
1237         while (nob > 0 && pg_count > 0) {
1238                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1239
1240                 /* corrupt the data before we compute the checksum, to
1241                  * simulate an OST->client data error */
1242                 if (i == 0 && opc == OST_READ &&
1243                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1244                         unsigned char *ptr = kmap(pga[i]->pg);
1245                         int off = pga[i]->off & ~PAGE_MASK;
1246
1247                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1248                         kunmap(pga[i]->pg);
1249                 }
1250                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1251                                             pga[i]->off & ~PAGE_MASK,
1252                                             count);
1253                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1254                                (int)(pga[i]->off & ~PAGE_MASK));
1255
1256                 nob -= pga[i]->count;
1257                 pg_count--;
1258                 i++;
1259         }
1260
1261         bufsize = sizeof(*cksum);
1262         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1263
1264         /* For sending we only compute the wrong checksum instead
1265          * of corrupting the data so it is still correct on a redo */
1266         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267                 (*cksum)++;
1268
1269         return 0;
1270 }
1271
1272 static int osc_checksum_bulk_rw(const char *obd_name,
1273                                 enum cksum_types cksum_type,
1274                                 int nob, size_t pg_count,
1275                                 struct brw_page **pga, int opc,
1276                                 u32 *check_sum)
1277 {
1278         obd_dif_csum_fn *fn = NULL;
1279         int sector_size = 0;
1280         int rc;
1281
1282         ENTRY;
1283         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1284
1285         if (fn)
1286                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1287                                              opc, fn, sector_size, check_sum);
1288         else
1289                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1290                                        check_sum);
1291
1292         RETURN(rc);
1293 }
1294
1295 static int
1296 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1297                      u32 page_count, struct brw_page **pga,
1298                      struct ptlrpc_request **reqp, int resend)
1299 {
1300         struct ptlrpc_request   *req;
1301         struct ptlrpc_bulk_desc *desc;
1302         struct ost_body         *body;
1303         struct obd_ioobj        *ioobj;
1304         struct niobuf_remote    *niobuf;
1305         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1306         struct osc_brw_async_args *aa;
1307         struct req_capsule      *pill;
1308         struct brw_page *pg_prev;
1309         void *short_io_buf;
1310         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1311
1312         ENTRY;
1313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1314                 RETURN(-ENOMEM); /* Recoverable */
1315         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1316                 RETURN(-EINVAL); /* Fatal */
1317
1318         if ((cmd & OBD_BRW_WRITE) != 0) {
1319                 opc = OST_WRITE;
1320                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1321                                                 osc_rq_pool,
1322                                                 &RQF_OST_BRW_WRITE);
1323         } else {
1324                 opc = OST_READ;
1325                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1326         }
1327         if (req == NULL)
1328                 RETURN(-ENOMEM);
1329
1330         for (niocount = i = 1; i < page_count; i++) {
1331                 if (!can_merge_pages(pga[i - 1], pga[i]))
1332                         niocount++;
1333         }
1334
1335         pill = &req->rq_pill;
1336         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1337                              sizeof(*ioobj));
1338         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1339                              niocount * sizeof(*niobuf));
1340
1341         for (i = 0; i < page_count; i++)
1342                 short_io_size += pga[i]->count;
1343
1344         /* Check if read/write is small enough to be a short io. */
1345         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1346             !imp_connect_shortio(cli->cl_import))
1347                 short_io_size = 0;
1348
1349         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1350                              opc == OST_READ ? 0 : short_io_size);
1351         if (opc == OST_READ)
1352                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1353                                      short_io_size);
1354
1355         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1356         if (rc) {
1357                 ptlrpc_request_free(req);
1358                 RETURN(rc);
1359         }
1360         osc_set_io_portal(req);
1361
1362         ptlrpc_at_set_req_timeout(req);
1363         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1364          * retry logic */
1365         req->rq_no_retry_einprogress = 1;
1366
1367         if (short_io_size != 0) {
1368                 desc = NULL;
1369                 short_io_buf = NULL;
1370                 goto no_bulk;
1371         }
1372
1373         desc = ptlrpc_prep_bulk_imp(req, page_count,
1374                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1375                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1376                         PTLRPC_BULK_PUT_SINK) |
1377                         PTLRPC_BULK_BUF_KIOV,
1378                 OST_BULK_PORTAL,
1379                 &ptlrpc_bulk_kiov_pin_ops);
1380
1381         if (desc == NULL)
1382                 GOTO(out, rc = -ENOMEM);
1383         /* NB request now owns desc and will free it when it gets freed */
1384 no_bulk:
1385         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1386         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1387         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1388         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1389
1390         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1391
1392         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1393          * and from_kgid(), because they are asynchronous. Fortunately, variable
1394          * oa contains valid o_uid and o_gid in these two operations.
1395          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1396          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1397          * other process logic */
1398         body->oa.o_uid = oa->o_uid;
1399         body->oa.o_gid = oa->o_gid;
1400
1401         obdo_to_ioobj(oa, ioobj);
1402         ioobj->ioo_bufcnt = niocount;
1403         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1404          * that might be send for this request.  The actual number is decided
1405          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1406          * "max - 1" for old client compatibility sending "0", and also so the
1407          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1408         if (desc != NULL)
1409                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1410         else /* short io */
1411                 ioobj_max_brw_set(ioobj, 0);
1412
1413         if (short_io_size != 0) {
1414                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1415                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1416                         body->oa.o_flags = 0;
1417                 }
1418                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1419                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1420                        short_io_size);
1421                 if (opc == OST_WRITE) {
1422                         short_io_buf = req_capsule_client_get(pill,
1423                                                               &RMF_SHORT_IO);
1424                         LASSERT(short_io_buf != NULL);
1425                 }
1426         }
1427
1428         LASSERT(page_count > 0);
1429         pg_prev = pga[0];
1430         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1431                 struct brw_page *pg = pga[i];
1432                 int poff = pg->off & ~PAGE_MASK;
1433
1434                 LASSERT(pg->count > 0);
1435                 /* make sure there is no gap in the middle of page array */
1436                 LASSERTF(page_count == 1 ||
1437                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1438                           ergo(i > 0 && i < page_count - 1,
1439                                poff == 0 && pg->count == PAGE_SIZE)   &&
1440                           ergo(i == page_count - 1, poff == 0)),
1441                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1442                          i, page_count, pg, pg->off, pg->count);
1443                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1444                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1445                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1446                          i, page_count,
1447                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1448                          pg_prev->pg, page_private(pg_prev->pg),
1449                          pg_prev->pg->index, pg_prev->off);
1450                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1451                         (pg->flag & OBD_BRW_SRVLOCK));
1452                 if (short_io_size != 0 && opc == OST_WRITE) {
1453                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1454
1455                         LASSERT(short_io_size >= requested_nob + pg->count);
1456                         memcpy(short_io_buf + requested_nob,
1457                                ptr + poff,
1458                                pg->count);
1459                         ll_kunmap_atomic(ptr, KM_USER0);
1460                 } else if (short_io_size == 0) {
1461                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1462                                                          pg->count);
1463                 }
1464                 requested_nob += pg->count;
1465
1466                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1467                         niobuf--;
1468                         niobuf->rnb_len += pg->count;
1469                 } else {
1470                         niobuf->rnb_offset = pg->off;
1471                         niobuf->rnb_len    = pg->count;
1472                         niobuf->rnb_flags  = pg->flag;
1473                 }
1474                 pg_prev = pg;
1475         }
1476
1477         LASSERTF((void *)(niobuf - niocount) ==
1478                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1479                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1480                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1481
1482         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1483         if (resend) {
1484                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1485                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1486                         body->oa.o_flags = 0;
1487                 }
1488                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1489         }
1490
1491         if (osc_should_shrink_grant(cli))
1492                 osc_shrink_grant_local(cli, &body->oa);
1493
1494         /* size[REQ_REC_OFF] still sizeof (*body) */
1495         if (opc == OST_WRITE) {
1496                 if (cli->cl_checksum &&
1497                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1498                         /* store cl_cksum_type in a local variable since
1499                          * it can be changed via lprocfs */
1500                         enum cksum_types cksum_type = cli->cl_cksum_type;
1501
1502                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1503                                 body->oa.o_flags = 0;
1504
1505                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1506                                                                 cksum_type);
1507                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1508
1509                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1510                                                   requested_nob, page_count,
1511                                                   pga, OST_WRITE,
1512                                                   &body->oa.o_cksum);
1513                         if (rc < 0) {
1514                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1515                                        rc);
1516                                 GOTO(out, rc);
1517                         }
1518                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1519                                body->oa.o_cksum);
1520
1521                         /* save this in 'oa', too, for later checking */
1522                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1523                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1524                                                            cksum_type);
1525                 } else {
1526                         /* clear out the checksum flag, in case this is a
1527                          * resend but cl_checksum is no longer set. b=11238 */
1528                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1529                 }
1530                 oa->o_cksum = body->oa.o_cksum;
1531                 /* 1 RC per niobuf */
1532                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1533                                      sizeof(__u32) * niocount);
1534         } else {
1535                 if (cli->cl_checksum &&
1536                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1537                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1538                                 body->oa.o_flags = 0;
1539                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1540                                 cli->cl_cksum_type);
1541                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1542                 }
1543
1544                 /* Client cksum has been already copied to wire obdo in previous
1545                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1546                  * resent due to cksum error, this will allow Server to
1547                  * check+dump pages on its side */
1548         }
1549         ptlrpc_request_set_replen(req);
1550
1551         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1552         aa = ptlrpc_req_async_args(req);
1553         aa->aa_oa = oa;
1554         aa->aa_requested_nob = requested_nob;
1555         aa->aa_nio_count = niocount;
1556         aa->aa_page_count = page_count;
1557         aa->aa_resends = 0;
1558         aa->aa_ppga = pga;
1559         aa->aa_cli = cli;
1560         INIT_LIST_HEAD(&aa->aa_oaps);
1561
1562         *reqp = req;
1563         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1564         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1565                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1566                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1567         RETURN(0);
1568
1569  out:
1570         ptlrpc_req_finished(req);
1571         RETURN(rc);
1572 }
1573
1574 char dbgcksum_file_name[PATH_MAX];
1575
1576 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1577                                 struct brw_page **pga, __u32 server_cksum,
1578                                 __u32 client_cksum)
1579 {
1580         struct file *filp;
1581         int rc, i;
1582         unsigned int len;
1583         char *buf;
1584
1585         /* will only keep dump of pages on first error for the same range in
1586          * file/fid, not during the resends/retries. */
1587         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1588                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1589                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1590                   libcfs_debug_file_path_arr :
1591                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1592                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1593                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1594                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1595                  pga[0]->off,
1596                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1597                  client_cksum, server_cksum);
1598         filp = filp_open(dbgcksum_file_name,
1599                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1600         if (IS_ERR(filp)) {
1601                 rc = PTR_ERR(filp);
1602                 if (rc == -EEXIST)
1603                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1604                                "checksum error: rc = %d\n", dbgcksum_file_name,
1605                                rc);
1606                 else
1607                         CERROR("%s: can't open to dump pages with checksum "
1608                                "error: rc = %d\n", dbgcksum_file_name, rc);
1609                 return;
1610         }
1611
1612         for (i = 0; i < page_count; i++) {
1613                 len = pga[i]->count;
1614                 buf = kmap(pga[i]->pg);
1615                 while (len != 0) {
1616                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1617                         if (rc < 0) {
1618                                 CERROR("%s: wanted to write %u but got %d "
1619                                        "error\n", dbgcksum_file_name, len, rc);
1620                                 break;
1621                         }
1622                         len -= rc;
1623                         buf += rc;
1624                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1625                                dbgcksum_file_name, rc);
1626                 }
1627                 kunmap(pga[i]->pg);
1628         }
1629
1630         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1631         if (rc)
1632                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1633         filp_close(filp, NULL);
1634         return;
1635 }
1636
1637 static int
1638 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1639                      __u32 client_cksum, __u32 server_cksum,
1640                      struct osc_brw_async_args *aa)
1641 {
1642         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1643         enum cksum_types cksum_type;
1644         obd_dif_csum_fn *fn = NULL;
1645         int sector_size = 0;
1646         __u32 new_cksum;
1647         char *msg;
1648         int rc;
1649
1650         if (server_cksum == client_cksum) {
1651                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1652                 return 0;
1653         }
1654
1655         if (aa->aa_cli->cl_checksum_dump)
1656                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1657                                     server_cksum, client_cksum);
1658
1659         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1660                                            oa->o_flags : 0);
1661
1662         switch (cksum_type) {
1663         case OBD_CKSUM_T10IP512:
1664                 fn = obd_dif_ip_fn;
1665                 sector_size = 512;
1666                 break;
1667         case OBD_CKSUM_T10IP4K:
1668                 fn = obd_dif_ip_fn;
1669                 sector_size = 4096;
1670                 break;
1671         case OBD_CKSUM_T10CRC512:
1672                 fn = obd_dif_crc_fn;
1673                 sector_size = 512;
1674                 break;
1675         case OBD_CKSUM_T10CRC4K:
1676                 fn = obd_dif_crc_fn;
1677                 sector_size = 4096;
1678                 break;
1679         default:
1680                 break;
1681         }
1682
1683         if (fn)
1684                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1685                                              aa->aa_page_count, aa->aa_ppga,
1686                                              OST_WRITE, fn, sector_size,
1687                                              &new_cksum);
1688         else
1689                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1690                                        aa->aa_ppga, OST_WRITE, cksum_type,
1691                                        &new_cksum);
1692
1693         if (rc < 0)
1694                 msg = "failed to calculate the client write checksum";
1695         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1696                 msg = "the server did not use the checksum type specified in "
1697                       "the original request - likely a protocol problem";
1698         else if (new_cksum == server_cksum)
1699                 msg = "changed on the client after we checksummed it - "
1700                       "likely false positive due to mmap IO (bug 11742)";
1701         else if (new_cksum == client_cksum)
1702                 msg = "changed in transit before arrival at OST";
1703         else
1704                 msg = "changed in transit AND doesn't match the original - "
1705                       "likely false positive due to mmap IO (bug 11742)";
1706
1707         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1708                            DFID " object "DOSTID" extent [%llu-%llu], original "
1709                            "client csum %x (type %x), server csum %x (type %x),"
1710                            " client csum now %x\n",
1711                            obd_name, msg, libcfs_nid2str(peer->nid),
1712                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1713                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1714                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1715                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1716                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1717                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1718                            client_cksum,
1719                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1720                            server_cksum, cksum_type, new_cksum);
1721         return 1;
1722 }
1723
1724 /* Note rc enters this function as number of bytes transferred */
1725 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1726 {
1727         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1728         struct client_obd *cli = aa->aa_cli;
1729         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1730         const struct lnet_process_id *peer =
1731                 &req->rq_import->imp_connection->c_peer;
1732         struct ost_body *body;
1733         u32 client_cksum = 0;
1734         ENTRY;
1735
1736         if (rc < 0 && rc != -EDQUOT) {
1737                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1738                 RETURN(rc);
1739         }
1740
1741         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1742         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1743         if (body == NULL) {
1744                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1745                 RETURN(-EPROTO);
1746         }
1747
1748         /* set/clear over quota flag for a uid/gid/projid */
1749         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1750             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1751                 unsigned qid[LL_MAXQUOTAS] = {
1752                                          body->oa.o_uid, body->oa.o_gid,
1753                                          body->oa.o_projid };
1754                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1755                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1756                        body->oa.o_valid, body->oa.o_flags);
1757                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1758                                        body->oa.o_flags);
1759         }
1760
1761         osc_update_grant(cli, body);
1762
1763         if (rc < 0)
1764                 RETURN(rc);
1765
1766         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1767                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1768
1769         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1770                 if (rc > 0) {
1771                         CERROR("Unexpected +ve rc %d\n", rc);
1772                         RETURN(-EPROTO);
1773                 }
1774
1775                 if (req->rq_bulk != NULL &&
1776                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1777                         RETURN(-EAGAIN);
1778
1779                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780                     check_write_checksum(&body->oa, peer, client_cksum,
1781                                          body->oa.o_cksum, aa))
1782                         RETURN(-EAGAIN);
1783
1784                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1785                                      aa->aa_page_count, aa->aa_ppga);
1786                 GOTO(out, rc);
1787         }
1788
1789         /* The rest of this function executes only for OST_READs */
1790
1791         if (req->rq_bulk == NULL) {
1792                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1793                                           RCL_SERVER);
1794                 LASSERT(rc == req->rq_status);
1795         } else {
1796                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1797                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1798         }
1799         if (rc < 0)
1800                 GOTO(out, rc = -EAGAIN);
1801
1802         if (rc > aa->aa_requested_nob) {
1803                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1804                        aa->aa_requested_nob);
1805                 RETURN(-EPROTO);
1806         }
1807
1808         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1809                 CERROR ("Unexpected rc %d (%d transferred)\n",
1810                         rc, req->rq_bulk->bd_nob_transferred);
1811                 return (-EPROTO);
1812         }
1813
1814         if (req->rq_bulk == NULL) {
1815                 /* short io */
1816                 int nob, pg_count, i = 0;
1817                 unsigned char *buf;
1818
1819                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1820                 pg_count = aa->aa_page_count;
1821                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1822                                                    rc);
1823                 nob = rc;
1824                 while (nob > 0 && pg_count > 0) {
1825                         unsigned char *ptr;
1826                         int count = aa->aa_ppga[i]->count > nob ?
1827                                     nob : aa->aa_ppga[i]->count;
1828
1829                         CDEBUG(D_CACHE, "page %p count %d\n",
1830                                aa->aa_ppga[i]->pg, count);
1831                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1832                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1833                                count);
1834                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1835
1836                         buf += count;
1837                         nob -= count;
1838                         i++;
1839                         pg_count--;
1840                 }
1841         }
1842
1843         if (rc < aa->aa_requested_nob)
1844                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1845
1846         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1847                 static int cksum_counter;
1848                 u32        server_cksum = body->oa.o_cksum;
1849                 char      *via = "";
1850                 char      *router = "";
1851                 enum cksum_types cksum_type;
1852                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1853                         body->oa.o_flags : 0;
1854
1855                 cksum_type = obd_cksum_type_unpack(o_flags);
1856                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1857                                           aa->aa_page_count, aa->aa_ppga,
1858                                           OST_READ, &client_cksum);
1859                 if (rc < 0)
1860                         GOTO(out, rc);
1861
1862                 if (req->rq_bulk != NULL &&
1863                     peer->nid != req->rq_bulk->bd_sender) {
1864                         via = " via ";
1865                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1866                 }
1867
1868                 if (server_cksum != client_cksum) {
1869                         struct ost_body *clbody;
1870                         u32 page_count = aa->aa_page_count;
1871
1872                         clbody = req_capsule_client_get(&req->rq_pill,
1873                                                         &RMF_OST_BODY);
1874                         if (cli->cl_checksum_dump)
1875                                 dump_all_bulk_pages(&clbody->oa, page_count,
1876                                                     aa->aa_ppga, server_cksum,
1877                                                     client_cksum);
1878
1879                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1880                                            "%s%s%s inode "DFID" object "DOSTID
1881                                            " extent [%llu-%llu], client %x, "
1882                                            "server %x, cksum_type %x\n",
1883                                            obd_name,
1884                                            libcfs_nid2str(peer->nid),
1885                                            via, router,
1886                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1887                                                 clbody->oa.o_parent_seq : 0ULL,
1888                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1889                                                 clbody->oa.o_parent_oid : 0,
1890                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1891                                                 clbody->oa.o_parent_ver : 0,
1892                                            POSTID(&body->oa.o_oi),
1893                                            aa->aa_ppga[0]->off,
1894                                            aa->aa_ppga[page_count-1]->off +
1895                                            aa->aa_ppga[page_count-1]->count - 1,
1896                                            client_cksum, server_cksum,
1897                                            cksum_type);
1898                         cksum_counter = 0;
1899                         aa->aa_oa->o_cksum = client_cksum;
1900                         rc = -EAGAIN;
1901                 } else {
1902                         cksum_counter++;
1903                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1904                         rc = 0;
1905                 }
1906         } else if (unlikely(client_cksum)) {
1907                 static int cksum_missed;
1908
1909                 cksum_missed++;
1910                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1911                         CERROR("Checksum %u requested from %s but not sent\n",
1912                                cksum_missed, libcfs_nid2str(peer->nid));
1913         } else {
1914                 rc = 0;
1915         }
1916 out:
1917         if (rc >= 0)
1918                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1919                                      aa->aa_oa, &body->oa);
1920
1921         RETURN(rc);
1922 }
1923
1924 static int osc_brw_redo_request(struct ptlrpc_request *request,
1925                                 struct osc_brw_async_args *aa, int rc)
1926 {
1927         struct ptlrpc_request *new_req;
1928         struct osc_brw_async_args *new_aa;
1929         struct osc_async_page *oap;
1930         ENTRY;
1931
1932         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1933                   "redo for recoverable error %d", rc);
1934
1935         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1936                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1937                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1938                                   aa->aa_ppga, &new_req, 1);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1943                 if (oap->oap_request != NULL) {
1944                         LASSERTF(request == oap->oap_request,
1945                                  "request %p != oap_request %p\n",
1946                                  request, oap->oap_request);
1947                         if (oap->oap_interrupted) {
1948                                 ptlrpc_req_finished(new_req);
1949                                 RETURN(-EINTR);
1950                         }
1951                 }
1952         }
1953         /* New request takes over pga and oaps from old request.
1954          * Note that copying a list_head doesn't work, need to move it... */
1955         aa->aa_resends++;
1956         new_req->rq_interpret_reply = request->rq_interpret_reply;
1957         new_req->rq_async_args = request->rq_async_args;
1958         new_req->rq_commit_cb = request->rq_commit_cb;
1959         /* cap resend delay to the current request timeout, this is similar to
1960          * what ptlrpc does (see after_reply()) */
1961         if (aa->aa_resends > new_req->rq_timeout)
1962                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1963         else
1964                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1965         new_req->rq_generation_set = 1;
1966         new_req->rq_import_generation = request->rq_import_generation;
1967
1968         new_aa = ptlrpc_req_async_args(new_req);
1969
1970         INIT_LIST_HEAD(&new_aa->aa_oaps);
1971         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1972         INIT_LIST_HEAD(&new_aa->aa_exts);
1973         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1974         new_aa->aa_resends = aa->aa_resends;
1975
1976         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1977                 if (oap->oap_request) {
1978                         ptlrpc_req_finished(oap->oap_request);
1979                         oap->oap_request = ptlrpc_request_addref(new_req);
1980                 }
1981         }
1982
1983         /* XXX: This code will run into problem if we're going to support
1984          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1985          * and wait for all of them to be finished. We should inherit request
1986          * set from old request. */
1987         ptlrpcd_add_req(new_req);
1988
1989         DEBUG_REQ(D_INFO, new_req, "new request");
1990         RETURN(0);
1991 }
1992
1993 /*
1994  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1995  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1996  * fine for our small page arrays and doesn't require allocation.  its an
1997  * insertion sort that swaps elements that are strides apart, shrinking the
1998  * stride down until its '1' and the array is sorted.
1999  */
2000 static void sort_brw_pages(struct brw_page **array, int num)
2001 {
2002         int stride, i, j;
2003         struct brw_page *tmp;
2004
2005         if (num == 1)
2006                 return;
2007         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2008                 ;
2009
2010         do {
2011                 stride /= 3;
2012                 for (i = stride ; i < num ; i++) {
2013                         tmp = array[i];
2014                         j = i;
2015                         while (j >= stride && array[j - stride]->off > tmp->off) {
2016                                 array[j] = array[j - stride];
2017                                 j -= stride;
2018                         }
2019                         array[j] = tmp;
2020                 }
2021         } while (stride > 1);
2022 }
2023
2024 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2025 {
2026         LASSERT(ppga != NULL);
2027         OBD_FREE(ppga, sizeof(*ppga) * count);
2028 }
2029
2030 static int brw_interpret(const struct lu_env *env,
2031                          struct ptlrpc_request *req, void *data, int rc)
2032 {
2033         struct osc_brw_async_args *aa = data;
2034         struct osc_extent *ext;
2035         struct osc_extent *tmp;
2036         struct client_obd *cli = aa->aa_cli;
2037         unsigned long           transferred = 0;
2038         ENTRY;
2039
2040         rc = osc_brw_fini_request(req, rc);
2041         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2042         /* When server return -EINPROGRESS, client should always retry
2043          * regardless of the number of times the bulk was resent already. */
2044         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2045                 if (req->rq_import_generation !=
2046                     req->rq_import->imp_generation) {
2047                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2048                                ""DOSTID", rc = %d.\n",
2049                                req->rq_import->imp_obd->obd_name,
2050                                POSTID(&aa->aa_oa->o_oi), rc);
2051                 } else if (rc == -EINPROGRESS ||
2052                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2053                         rc = osc_brw_redo_request(req, aa, rc);
2054                 } else {
2055                         CERROR("%s: too many resent retries for object: "
2056                                "%llu:%llu, rc = %d.\n",
2057                                req->rq_import->imp_obd->obd_name,
2058                                POSTID(&aa->aa_oa->o_oi), rc);
2059                 }
2060
2061                 if (rc == 0)
2062                         RETURN(0);
2063                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2064                         rc = -EIO;
2065         }
2066
2067         if (rc == 0) {
2068                 struct obdo *oa = aa->aa_oa;
2069                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2070                 unsigned long valid = 0;
2071                 struct cl_object *obj;
2072                 struct osc_async_page *last;
2073
2074                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2075                 obj = osc2cl(last->oap_obj);
2076
2077                 cl_object_attr_lock(obj);
2078                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2079                         attr->cat_blocks = oa->o_blocks;
2080                         valid |= CAT_BLOCKS;
2081                 }
2082                 if (oa->o_valid & OBD_MD_FLMTIME) {
2083                         attr->cat_mtime = oa->o_mtime;
2084                         valid |= CAT_MTIME;
2085                 }
2086                 if (oa->o_valid & OBD_MD_FLATIME) {
2087                         attr->cat_atime = oa->o_atime;
2088                         valid |= CAT_ATIME;
2089                 }
2090                 if (oa->o_valid & OBD_MD_FLCTIME) {
2091                         attr->cat_ctime = oa->o_ctime;
2092                         valid |= CAT_CTIME;
2093                 }
2094
2095                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2096                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2097                         loff_t last_off = last->oap_count + last->oap_obj_off +
2098                                 last->oap_page_off;
2099
2100                         /* Change file size if this is an out of quota or
2101                          * direct IO write and it extends the file size */
2102                         if (loi->loi_lvb.lvb_size < last_off) {
2103                                 attr->cat_size = last_off;
2104                                 valid |= CAT_SIZE;
2105                         }
2106                         /* Extend KMS if it's not a lockless write */
2107                         if (loi->loi_kms < last_off &&
2108                             oap2osc_page(last)->ops_srvlock == 0) {
2109                                 attr->cat_kms = last_off;
2110                                 valid |= CAT_KMS;
2111                         }
2112                 }
2113
2114                 if (valid != 0)
2115                         cl_object_attr_update(env, obj, attr, valid);
2116                 cl_object_attr_unlock(obj);
2117         }
2118         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2119         aa->aa_oa = NULL;
2120
2121         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2122                 osc_inc_unstable_pages(req);
2123
2124         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2125                 list_del_init(&ext->oe_link);
2126                 osc_extent_finish(env, ext, 1,
2127                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2128         }
2129         LASSERT(list_empty(&aa->aa_exts));
2130         LASSERT(list_empty(&aa->aa_oaps));
2131
2132         transferred = (req->rq_bulk == NULL ? /* short io */
2133                        aa->aa_requested_nob :
2134                        req->rq_bulk->bd_nob_transferred);
2135
2136         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2137         ptlrpc_lprocfs_brw(req, transferred);
2138
2139         spin_lock(&cli->cl_loi_list_lock);
2140         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2141          * is called so we know whether to go to sync BRWs or wait for more
2142          * RPCs to complete */
2143         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2144                 cli->cl_w_in_flight--;
2145         else
2146                 cli->cl_r_in_flight--;
2147         osc_wake_cache_waiters(cli);
2148         spin_unlock(&cli->cl_loi_list_lock);
2149
2150         osc_io_unplug(env, cli, NULL);
2151         RETURN(rc);
2152 }
2153
2154 static void brw_commit(struct ptlrpc_request *req)
2155 {
2156         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2157          * this called via the rq_commit_cb, I need to ensure
2158          * osc_dec_unstable_pages is still called. Otherwise unstable
2159          * pages may be leaked. */
2160         spin_lock(&req->rq_lock);
2161         if (likely(req->rq_unstable)) {
2162                 req->rq_unstable = 0;
2163                 spin_unlock(&req->rq_lock);
2164
2165                 osc_dec_unstable_pages(req);
2166         } else {
2167                 req->rq_committed = 1;
2168                 spin_unlock(&req->rq_lock);
2169         }
2170 }
2171
2172 /**
2173  * Build an RPC by the list of extent @ext_list. The caller must ensure
2174  * that the total pages in this list are NOT over max pages per RPC.
2175  * Extents in the list must be in OES_RPC state.
2176  */
2177 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2178                   struct list_head *ext_list, int cmd)
2179 {
2180         struct ptlrpc_request           *req = NULL;
2181         struct osc_extent               *ext;
2182         struct brw_page                 **pga = NULL;
2183         struct osc_brw_async_args       *aa = NULL;
2184         struct obdo                     *oa = NULL;
2185         struct osc_async_page           *oap;
2186         struct osc_object               *obj = NULL;
2187         struct cl_req_attr              *crattr = NULL;
2188         loff_t                          starting_offset = OBD_OBJECT_EOF;
2189         loff_t                          ending_offset = 0;
2190         int                             mpflag = 0;
2191         int                             mem_tight = 0;
2192         int                             page_count = 0;
2193         bool                            soft_sync = false;
2194         bool                            interrupted = false;
2195         bool                            ndelay = false;
2196         int                             i;
2197         int                             grant = 0;
2198         int                             rc;
2199         __u32                           layout_version = 0;
2200         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2201         struct ost_body                 *body;
2202         ENTRY;
2203         LASSERT(!list_empty(ext_list));
2204
2205         /* add pages into rpc_list to build BRW rpc */
2206         list_for_each_entry(ext, ext_list, oe_link) {
2207                 LASSERT(ext->oe_state == OES_RPC);
2208                 mem_tight |= ext->oe_memalloc;
2209                 grant += ext->oe_grants;
2210                 page_count += ext->oe_nr_pages;
2211                 layout_version = MAX(layout_version, ext->oe_layout_version);
2212                 if (obj == NULL)
2213                         obj = ext->oe_obj;
2214         }
2215
2216         soft_sync = osc_over_unstable_soft_limit(cli);
2217         if (mem_tight)
2218                 mpflag = cfs_memory_pressure_get_and_set();
2219
2220         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2221         if (pga == NULL)
2222                 GOTO(out, rc = -ENOMEM);
2223
2224         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2225         if (oa == NULL)
2226                 GOTO(out, rc = -ENOMEM);
2227
2228         i = 0;
2229         list_for_each_entry(ext, ext_list, oe_link) {
2230                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2231                         if (mem_tight)
2232                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2233                         if (soft_sync)
2234                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2235                         pga[i] = &oap->oap_brw_page;
2236                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2237                         i++;
2238
2239                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2240                         if (starting_offset == OBD_OBJECT_EOF ||
2241                             starting_offset > oap->oap_obj_off)
2242                                 starting_offset = oap->oap_obj_off;
2243                         else
2244                                 LASSERT(oap->oap_page_off == 0);
2245                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2246                                 ending_offset = oap->oap_obj_off +
2247                                                 oap->oap_count;
2248                         else
2249                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2250                                         PAGE_SIZE);
2251                         if (oap->oap_interrupted)
2252                                 interrupted = true;
2253                 }
2254                 if (ext->oe_ndelay)
2255                         ndelay = true;
2256         }
2257
2258         /* first page in the list */
2259         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2260
2261         crattr = &osc_env_info(env)->oti_req_attr;
2262         memset(crattr, 0, sizeof(*crattr));
2263         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2264         crattr->cra_flags = ~0ULL;
2265         crattr->cra_page = oap2cl_page(oap);
2266         crattr->cra_oa = oa;
2267         cl_req_attr_set(env, osc2cl(obj), crattr);
2268
2269         if (cmd == OBD_BRW_WRITE) {
2270                 oa->o_grant_used = grant;
2271                 if (layout_version > 0) {
2272                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2273                                PFID(&oa->o_oi.oi_fid), layout_version);
2274
2275                         oa->o_layout_version = layout_version;
2276                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2277                 }
2278         }
2279
2280         sort_brw_pages(pga, page_count);
2281         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2282         if (rc != 0) {
2283                 CERROR("prep_req failed: %d\n", rc);
2284                 GOTO(out, rc);
2285         }
2286
2287         req->rq_commit_cb = brw_commit;
2288         req->rq_interpret_reply = brw_interpret;
2289         req->rq_memalloc = mem_tight != 0;
2290         oap->oap_request = ptlrpc_request_addref(req);
2291         if (interrupted && !req->rq_intr)
2292                 ptlrpc_mark_interrupted(req);
2293         if (ndelay) {
2294                 req->rq_no_resend = req->rq_no_delay = 1;
2295                 /* probably set a shorter timeout value.
2296                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2297                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2298         }
2299
2300         /* Need to update the timestamps after the request is built in case
2301          * we race with setattr (locally or in queue at OST).  If OST gets
2302          * later setattr before earlier BRW (as determined by the request xid),
2303          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2304          * way to do this in a single call.  bug 10150 */
2305         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2306         crattr->cra_oa = &body->oa;
2307         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2308         cl_req_attr_set(env, osc2cl(obj), crattr);
2309         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2310
2311         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2312         aa = ptlrpc_req_async_args(req);
2313         INIT_LIST_HEAD(&aa->aa_oaps);
2314         list_splice_init(&rpc_list, &aa->aa_oaps);
2315         INIT_LIST_HEAD(&aa->aa_exts);
2316         list_splice_init(ext_list, &aa->aa_exts);
2317
2318         spin_lock(&cli->cl_loi_list_lock);
2319         starting_offset >>= PAGE_SHIFT;
2320         if (cmd == OBD_BRW_READ) {
2321                 cli->cl_r_in_flight++;
2322                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2323                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2324                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2325                                       starting_offset + 1);
2326         } else {
2327                 cli->cl_w_in_flight++;
2328                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2329                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2330                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2331                                       starting_offset + 1);
2332         }
2333         spin_unlock(&cli->cl_loi_list_lock);
2334
2335         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2336                   page_count, aa, cli->cl_r_in_flight,
2337                   cli->cl_w_in_flight);
2338         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2339
2340         ptlrpcd_add_req(req);
2341         rc = 0;
2342         EXIT;
2343
2344 out:
2345         if (mem_tight != 0)
2346                 cfs_memory_pressure_restore(mpflag);
2347
2348         if (rc != 0) {
2349                 LASSERT(req == NULL);
2350
2351                 if (oa)
2352                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2353                 if (pga)
2354                         OBD_FREE(pga, sizeof(*pga) * page_count);
2355                 /* this should happen rarely and is pretty bad, it makes the
2356                  * pending list not follow the dirty order */
2357                 while (!list_empty(ext_list)) {
2358                         ext = list_entry(ext_list->next, struct osc_extent,
2359                                          oe_link);
2360                         list_del_init(&ext->oe_link);
2361                         osc_extent_finish(env, ext, 0, rc);
2362                 }
2363         }
2364         RETURN(rc);
2365 }
2366
2367 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2368 {
2369         int set = 0;
2370
2371         LASSERT(lock != NULL);
2372
2373         lock_res_and_lock(lock);
2374
2375         if (lock->l_ast_data == NULL)
2376                 lock->l_ast_data = data;
2377         if (lock->l_ast_data == data)
2378                 set = 1;
2379
2380         unlock_res_and_lock(lock);
2381
2382         return set;
2383 }
2384
2385 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2386                      void *cookie, struct lustre_handle *lockh,
2387                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2388                      int errcode)
2389 {
2390         bool intent = *flags & LDLM_FL_HAS_INTENT;
2391         int rc;
2392         ENTRY;
2393
2394         /* The request was created before ldlm_cli_enqueue call. */
2395         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2396                 struct ldlm_reply *rep;
2397
2398                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2399                 LASSERT(rep != NULL);
2400
2401                 rep->lock_policy_res1 =
2402                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2403                 if (rep->lock_policy_res1)
2404                         errcode = rep->lock_policy_res1;
2405                 if (!speculative)
2406                         *flags |= LDLM_FL_LVB_READY;
2407         } else if (errcode == ELDLM_OK) {
2408                 *flags |= LDLM_FL_LVB_READY;
2409         }
2410
2411         /* Call the update callback. */
2412         rc = (*upcall)(cookie, lockh, errcode);
2413
2414         /* release the reference taken in ldlm_cli_enqueue() */
2415         if (errcode == ELDLM_LOCK_MATCHED)
2416                 errcode = ELDLM_OK;
2417         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2418                 ldlm_lock_decref(lockh, mode);
2419
2420         RETURN(rc);
2421 }
2422
2423 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2424                           struct osc_enqueue_args *aa, int rc)
2425 {
2426         struct ldlm_lock *lock;
2427         struct lustre_handle *lockh = &aa->oa_lockh;
2428         enum ldlm_mode mode = aa->oa_mode;
2429         struct ost_lvb *lvb = aa->oa_lvb;
2430         __u32 lvb_len = sizeof(*lvb);
2431         __u64 flags = 0;
2432
2433         ENTRY;
2434
2435         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2436          * be valid. */
2437         lock = ldlm_handle2lock(lockh);
2438         LASSERTF(lock != NULL,
2439                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2440                  lockh->cookie, req, aa);
2441
2442         /* Take an additional reference so that a blocking AST that
2443          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2444          * to arrive after an upcall has been executed by
2445          * osc_enqueue_fini(). */
2446         ldlm_lock_addref(lockh, mode);
2447
2448         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2449         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2450
2451         /* Let CP AST to grant the lock first. */
2452         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2453
2454         if (aa->oa_speculative) {
2455                 LASSERT(aa->oa_lvb == NULL);
2456                 LASSERT(aa->oa_flags == NULL);
2457                 aa->oa_flags = &flags;
2458         }
2459
2460         /* Complete obtaining the lock procedure. */
2461         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2462                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2463                                    lockh, rc);
2464         /* Complete osc stuff. */
2465         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2466                               aa->oa_flags, aa->oa_speculative, rc);
2467
2468         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2469
2470         ldlm_lock_decref(lockh, mode);
2471         LDLM_LOCK_PUT(lock);
2472         RETURN(rc);
2473 }
2474
2475 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2476
2477 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2478  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2479  * other synchronous requests, however keeping some locks and trying to obtain
2480  * others may take a considerable amount of time in a case of ost failure; and
2481  * when other sync requests do not get released lock from a client, the client
2482  * is evicted from the cluster -- such scenarious make the life difficult, so
2483  * release locks just after they are obtained. */
2484 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2485                      __u64 *flags, union ldlm_policy_data *policy,
2486                      struct ost_lvb *lvb, int kms_valid,
2487                      osc_enqueue_upcall_f upcall, void *cookie,
2488                      struct ldlm_enqueue_info *einfo,
2489                      struct ptlrpc_request_set *rqset, int async,
2490                      bool speculative)
2491 {
2492         struct obd_device *obd = exp->exp_obd;
2493         struct lustre_handle lockh = { 0 };
2494         struct ptlrpc_request *req = NULL;
2495         int intent = *flags & LDLM_FL_HAS_INTENT;
2496         __u64 match_flags = *flags;
2497         enum ldlm_mode mode;
2498         int rc;
2499         ENTRY;
2500
2501         /* Filesystem lock extents are extended to page boundaries so that
2502          * dealing with the page cache is a little smoother.  */
2503         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2504         policy->l_extent.end |= ~PAGE_MASK;
2505
2506         /*
2507          * kms is not valid when either object is completely fresh (so that no
2508          * locks are cached), or object was evicted. In the latter case cached
2509          * lock cannot be used, because it would prime inode state with
2510          * potentially stale LVB.
2511          */
2512         if (!kms_valid)
2513                 goto no_match;
2514
2515         /* Next, search for already existing extent locks that will cover us */
2516         /* If we're trying to read, we also search for an existing PW lock.  The
2517          * VFS and page cache already protect us locally, so lots of readers/
2518          * writers can share a single PW lock.
2519          *
2520          * There are problems with conversion deadlocks, so instead of
2521          * converting a read lock to a write lock, we'll just enqueue a new
2522          * one.
2523          *
2524          * At some point we should cancel the read lock instead of making them
2525          * send us a blocking callback, but there are problems with canceling
2526          * locks out from other users right now, too. */
2527         mode = einfo->ei_mode;
2528         if (einfo->ei_mode == LCK_PR)
2529                 mode |= LCK_PW;
2530         /* Normal lock requests must wait for the LVB to be ready before
2531          * matching a lock; speculative lock requests do not need to,
2532          * because they will not actually use the lock. */
2533         if (!speculative)
2534                 match_flags |= LDLM_FL_LVB_READY;
2535         if (intent != 0)
2536                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2537         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2538                                einfo->ei_type, policy, mode, &lockh, 0);
2539         if (mode) {
2540                 struct ldlm_lock *matched;
2541
2542                 if (*flags & LDLM_FL_TEST_LOCK)
2543                         RETURN(ELDLM_OK);
2544
2545                 matched = ldlm_handle2lock(&lockh);
2546                 if (speculative) {
2547                         /* This DLM lock request is speculative, and does not
2548                          * have an associated IO request. Therefore if there
2549                          * is already a DLM lock, it wll just inform the
2550                          * caller to cancel the request for this stripe.*/
2551                         lock_res_and_lock(matched);
2552                         if (ldlm_extent_equal(&policy->l_extent,
2553                             &matched->l_policy_data.l_extent))
2554                                 rc = -EEXIST;
2555                         else
2556                                 rc = -ECANCELED;
2557                         unlock_res_and_lock(matched);
2558
2559                         ldlm_lock_decref(&lockh, mode);
2560                         LDLM_LOCK_PUT(matched);
2561                         RETURN(rc);
2562                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2563                         *flags |= LDLM_FL_LVB_READY;
2564
2565                         /* We already have a lock, and it's referenced. */
2566                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2567
2568                         ldlm_lock_decref(&lockh, mode);
2569                         LDLM_LOCK_PUT(matched);
2570                         RETURN(ELDLM_OK);
2571                 } else {
2572                         ldlm_lock_decref(&lockh, mode);
2573                         LDLM_LOCK_PUT(matched);
2574                 }
2575         }
2576
2577 no_match:
2578         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2579                 RETURN(-ENOLCK);
2580
2581         if (intent) {
2582                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2583                                            &RQF_LDLM_ENQUEUE_LVB);
2584                 if (req == NULL)
2585                         RETURN(-ENOMEM);
2586
2587                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2588                 if (rc) {
2589                         ptlrpc_request_free(req);
2590                         RETURN(rc);
2591                 }
2592
2593                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2594                                      sizeof *lvb);
2595                 ptlrpc_request_set_replen(req);
2596         }
2597
2598         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2599         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2600
2601         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2602                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2603         if (async) {
2604                 if (!rc) {
2605                         struct osc_enqueue_args *aa;
2606                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2607                         aa = ptlrpc_req_async_args(req);
2608                         aa->oa_exp         = exp;
2609                         aa->oa_mode        = einfo->ei_mode;
2610                         aa->oa_type        = einfo->ei_type;
2611                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2612                         aa->oa_upcall      = upcall;
2613                         aa->oa_cookie      = cookie;
2614                         aa->oa_speculative = speculative;
2615                         if (!speculative) {
2616                                 aa->oa_flags  = flags;
2617                                 aa->oa_lvb    = lvb;
2618                         } else {
2619                                 /* speculative locks are essentially to enqueue
2620                                  * a DLM lock  in advance, so we don't care
2621                                  * about the result of the enqueue. */
2622                                 aa->oa_lvb    = NULL;
2623                                 aa->oa_flags  = NULL;
2624                         }
2625
2626                         req->rq_interpret_reply =
2627                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2628                         if (rqset == PTLRPCD_SET)
2629                                 ptlrpcd_add_req(req);
2630                         else
2631                                 ptlrpc_set_add_req(rqset, req);
2632                 } else if (intent) {
2633                         ptlrpc_req_finished(req);
2634                 }
2635                 RETURN(rc);
2636         }
2637
2638         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2639                               flags, speculative, rc);
2640         if (intent)
2641                 ptlrpc_req_finished(req);
2642
2643         RETURN(rc);
2644 }
2645
2646 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2647                    enum ldlm_type type, union ldlm_policy_data *policy,
2648                    enum ldlm_mode mode, __u64 *flags, void *data,
2649                    struct lustre_handle *lockh, int unref)
2650 {
2651         struct obd_device *obd = exp->exp_obd;
2652         __u64 lflags = *flags;
2653         enum ldlm_mode rc;
2654         ENTRY;
2655
2656         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2657                 RETURN(-EIO);
2658
2659         /* Filesystem lock extents are extended to page boundaries so that
2660          * dealing with the page cache is a little smoother */
2661         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2662         policy->l_extent.end |= ~PAGE_MASK;
2663
2664         /* Next, search for already existing extent locks that will cover us */
2665         /* If we're trying to read, we also search for an existing PW lock.  The
2666          * VFS and page cache already protect us locally, so lots of readers/
2667          * writers can share a single PW lock. */
2668         rc = mode;
2669         if (mode == LCK_PR)
2670                 rc |= LCK_PW;
2671         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2672                              res_id, type, policy, rc, lockh, unref);
2673         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2674                 RETURN(rc);
2675
2676         if (data != NULL) {
2677                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2678
2679                 LASSERT(lock != NULL);
2680                 if (!osc_set_lock_data(lock, data)) {
2681                         ldlm_lock_decref(lockh, rc);
2682                         rc = 0;
2683                 }
2684                 LDLM_LOCK_PUT(lock);
2685         }
2686         RETURN(rc);
2687 }
2688
2689 static int osc_statfs_interpret(const struct lu_env *env,
2690                                 struct ptlrpc_request *req,
2691                                 struct osc_async_args *aa, int rc)
2692 {
2693         struct obd_statfs *msfs;
2694         ENTRY;
2695
2696         if (rc == -EBADR)
2697                 /* The request has in fact never been sent
2698                  * due to issues at a higher level (LOV).
2699                  * Exit immediately since the caller is
2700                  * aware of the problem and takes care
2701                  * of the clean up */
2702                  RETURN(rc);
2703
2704         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2705             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2706                 GOTO(out, rc = 0);
2707
2708         if (rc != 0)
2709                 GOTO(out, rc);
2710
2711         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712         if (msfs == NULL) {
2713                 GOTO(out, rc = -EPROTO);
2714         }
2715
2716         *aa->aa_oi->oi_osfs = *msfs;
2717 out:
2718         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719         RETURN(rc);
2720 }
2721
2722 static int osc_statfs_async(struct obd_export *exp,
2723                             struct obd_info *oinfo, time64_t max_age,
2724                             struct ptlrpc_request_set *rqset)
2725 {
2726         struct obd_device     *obd = class_exp2obd(exp);
2727         struct ptlrpc_request *req;
2728         struct osc_async_args *aa;
2729         int rc;
2730         ENTRY;
2731
2732         /* We could possibly pass max_age in the request (as an absolute
2733          * timestamp or a "seconds.usec ago") so the target can avoid doing
2734          * extra calls into the filesystem if that isn't necessary (e.g.
2735          * during mount that would help a bit).  Having relative timestamps
2736          * is not so great if request processing is slow, while absolute
2737          * timestamps are not ideal because they need time synchronization. */
2738         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2739         if (req == NULL)
2740                 RETURN(-ENOMEM);
2741
2742         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2743         if (rc) {
2744                 ptlrpc_request_free(req);
2745                 RETURN(rc);
2746         }
2747         ptlrpc_request_set_replen(req);
2748         req->rq_request_portal = OST_CREATE_PORTAL;
2749         ptlrpc_at_set_req_timeout(req);
2750
2751         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2752                 /* procfs requests not want stat in wait for avoid deadlock */
2753                 req->rq_no_resend = 1;
2754                 req->rq_no_delay = 1;
2755         }
2756
2757         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2758         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2759         aa = ptlrpc_req_async_args(req);
2760         aa->aa_oi = oinfo;
2761
2762         ptlrpc_set_add_req(rqset, req);
2763         RETURN(0);
2764 }
2765
2766 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2767                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2768 {
2769         struct obd_device     *obd = class_exp2obd(exp);
2770         struct obd_statfs     *msfs;
2771         struct ptlrpc_request *req;
2772         struct obd_import     *imp = NULL;
2773         int rc;
2774         ENTRY;
2775
2776
2777         /*Since the request might also come from lprocfs, so we need
2778          *sync this with client_disconnect_export Bug15684*/
2779         down_read(&obd->u.cli.cl_sem);
2780         if (obd->u.cli.cl_import)
2781                 imp = class_import_get(obd->u.cli.cl_import);
2782         up_read(&obd->u.cli.cl_sem);
2783         if (!imp)
2784                 RETURN(-ENODEV);
2785
2786         /* We could possibly pass max_age in the request (as an absolute
2787          * timestamp or a "seconds.usec ago") so the target can avoid doing
2788          * extra calls into the filesystem if that isn't necessary (e.g.
2789          * during mount that would help a bit).  Having relative timestamps
2790          * is not so great if request processing is slow, while absolute
2791          * timestamps are not ideal because they need time synchronization. */
2792         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2793
2794         class_import_put(imp);
2795
2796         if (req == NULL)
2797                 RETURN(-ENOMEM);
2798
2799         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2800         if (rc) {
2801                 ptlrpc_request_free(req);
2802                 RETURN(rc);
2803         }
2804         ptlrpc_request_set_replen(req);
2805         req->rq_request_portal = OST_CREATE_PORTAL;
2806         ptlrpc_at_set_req_timeout(req);
2807
2808         if (flags & OBD_STATFS_NODELAY) {
2809                 /* procfs requests not want stat in wait for avoid deadlock */
2810                 req->rq_no_resend = 1;
2811                 req->rq_no_delay = 1;
2812         }
2813
2814         rc = ptlrpc_queue_wait(req);
2815         if (rc)
2816                 GOTO(out, rc);
2817
2818         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2819         if (msfs == NULL)
2820                 GOTO(out, rc = -EPROTO);
2821
2822         *osfs = *msfs;
2823
2824         EXIT;
2825 out:
2826         ptlrpc_req_finished(req);
2827         return rc;
2828 }
2829
2830 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2831                          void *karg, void __user *uarg)
2832 {
2833         struct obd_device *obd = exp->exp_obd;
2834         struct obd_ioctl_data *data = karg;
2835         int err = 0;
2836         ENTRY;
2837
2838         if (!try_module_get(THIS_MODULE)) {
2839                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2840                        module_name(THIS_MODULE));
2841                 return -EINVAL;
2842         }
2843         switch (cmd) {
2844         case OBD_IOC_CLIENT_RECOVER:
2845                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2846                                             data->ioc_inlbuf1, 0);
2847                 if (err > 0)
2848                         err = 0;
2849                 GOTO(out, err);
2850         case IOC_OSC_SET_ACTIVE:
2851                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2852                                                data->ioc_offset);
2853                 GOTO(out, err);
2854         case OBD_IOC_PING_TARGET:
2855                 err = ptlrpc_obd_ping(obd);
2856                 GOTO(out, err);
2857         default:
2858                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2859                        cmd, current_comm());
2860                 GOTO(out, err = -ENOTTY);
2861         }
2862 out:
2863         module_put(THIS_MODULE);
2864         return err;
2865 }
2866
2867 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2868                        u32 keylen, void *key, u32 vallen, void *val,
2869                        struct ptlrpc_request_set *set)
2870 {
2871         struct ptlrpc_request *req;
2872         struct obd_device     *obd = exp->exp_obd;
2873         struct obd_import     *imp = class_exp2cliimp(exp);
2874         char                  *tmp;
2875         int                    rc;
2876         ENTRY;
2877
2878         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2879
2880         if (KEY_IS(KEY_CHECKSUM)) {
2881                 if (vallen != sizeof(int))
2882                         RETURN(-EINVAL);
2883                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2884                 RETURN(0);
2885         }
2886
2887         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2888                 sptlrpc_conf_client_adapt(obd);
2889                 RETURN(0);
2890         }
2891
2892         if (KEY_IS(KEY_FLUSH_CTX)) {
2893                 sptlrpc_import_flush_my_ctx(imp);
2894                 RETURN(0);
2895         }
2896
2897         if (KEY_IS(KEY_CACHE_SET)) {
2898                 struct client_obd *cli = &obd->u.cli;
2899
2900                 LASSERT(cli->cl_cache == NULL); /* only once */
2901                 cli->cl_cache = (struct cl_client_cache *)val;
2902                 cl_cache_incref(cli->cl_cache);
2903                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2904
2905                 /* add this osc into entity list */
2906                 LASSERT(list_empty(&cli->cl_lru_osc));
2907                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2908                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2909                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2910
2911                 RETURN(0);
2912         }
2913
2914         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2915                 struct client_obd *cli = &obd->u.cli;
2916                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2917                 long target = *(long *)val;
2918
2919                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2920                 *(long *)val -= nr;
2921                 RETURN(0);
2922         }
2923
2924         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2925                 RETURN(-EINVAL);
2926
2927         /* We pass all other commands directly to OST. Since nobody calls osc
2928            methods directly and everybody is supposed to go through LOV, we
2929            assume lov checked invalid values for us.
2930            The only recognised values so far are evict_by_nid and mds_conn.
2931            Even if something bad goes through, we'd get a -EINVAL from OST
2932            anyway. */
2933
2934         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2935                                                 &RQF_OST_SET_GRANT_INFO :
2936                                                 &RQF_OBD_SET_INFO);
2937         if (req == NULL)
2938                 RETURN(-ENOMEM);
2939
2940         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2941                              RCL_CLIENT, keylen);
2942         if (!KEY_IS(KEY_GRANT_SHRINK))
2943                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2944                                      RCL_CLIENT, vallen);
2945         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2946         if (rc) {
2947                 ptlrpc_request_free(req);
2948                 RETURN(rc);
2949         }
2950
2951         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2952         memcpy(tmp, key, keylen);
2953         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2954                                                         &RMF_OST_BODY :
2955                                                         &RMF_SETINFO_VAL);
2956         memcpy(tmp, val, vallen);
2957
2958         if (KEY_IS(KEY_GRANT_SHRINK)) {
2959                 struct osc_grant_args *aa;
2960                 struct obdo *oa;
2961
2962                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2963                 aa = ptlrpc_req_async_args(req);
2964                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2965                 if (!oa) {
2966                         ptlrpc_req_finished(req);
2967                         RETURN(-ENOMEM);
2968                 }
2969                 *oa = ((struct ost_body *)val)->oa;
2970                 aa->aa_oa = oa;
2971                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2972         }
2973
2974         ptlrpc_request_set_replen(req);
2975         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2976                 LASSERT(set != NULL);
2977                 ptlrpc_set_add_req(set, req);
2978                 ptlrpc_check_set(NULL, set);
2979         } else {
2980                 ptlrpcd_add_req(req);
2981         }
2982
2983         RETURN(0);
2984 }
2985 EXPORT_SYMBOL(osc_set_info_async);
2986
2987 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2988                   struct obd_device *obd, struct obd_uuid *cluuid,
2989                   struct obd_connect_data *data, void *localdata)
2990 {
2991         struct client_obd *cli = &obd->u.cli;
2992
2993         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2994                 long lost_grant;
2995                 long grant;
2996
2997                 spin_lock(&cli->cl_loi_list_lock);
2998                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2999                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3000                         /* restore ocd_grant_blkbits as client page bits */
3001                         data->ocd_grant_blkbits = PAGE_SHIFT;
3002                         grant += cli->cl_dirty_grant;
3003                 } else {
3004                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3005                 }
3006                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3007                 lost_grant = cli->cl_lost_grant;
3008                 cli->cl_lost_grant = 0;
3009                 spin_unlock(&cli->cl_loi_list_lock);
3010
3011                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3012                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3013                        data->ocd_version, data->ocd_grant, lost_grant);
3014         }
3015
3016         RETURN(0);
3017 }
3018 EXPORT_SYMBOL(osc_reconnect);
3019
3020 int osc_disconnect(struct obd_export *exp)
3021 {
3022         struct obd_device *obd = class_exp2obd(exp);
3023         int rc;
3024
3025         rc = client_disconnect_export(exp);
3026         /**
3027          * Initially we put del_shrink_grant before disconnect_export, but it
3028          * causes the following problem if setup (connect) and cleanup
3029          * (disconnect) are tangled together.
3030          *      connect p1                     disconnect p2
3031          *   ptlrpc_connect_import
3032          *     ...............               class_manual_cleanup
3033          *                                     osc_disconnect
3034          *                                     del_shrink_grant
3035          *   ptlrpc_connect_interrupt
3036          *     osc_init_grant
3037          *   add this client to shrink list
3038          *                                      cleanup_osc
3039          * Bang! grant shrink thread trigger the shrink. BUG18662
3040          */
3041         osc_del_grant_list(&obd->u.cli);
3042         return rc;
3043 }
3044 EXPORT_SYMBOL(osc_disconnect);
3045
3046 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3047                                  struct hlist_node *hnode, void *arg)
3048 {
3049         struct lu_env *env = arg;
3050         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3051         struct ldlm_lock *lock;
3052         struct osc_object *osc = NULL;
3053         ENTRY;
3054
3055         lock_res(res);
3056         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3057                 if (lock->l_ast_data != NULL && osc == NULL) {
3058                         osc = lock->l_ast_data;
3059                         cl_object_get(osc2cl(osc));
3060                 }
3061
3062                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3063                  * by the 2nd round of ldlm_namespace_clean() call in
3064                  * osc_import_event(). */
3065                 ldlm_clear_cleaned(lock);
3066         }
3067         unlock_res(res);
3068
3069         if (osc != NULL) {
3070                 osc_object_invalidate(env, osc);
3071                 cl_object_put(env, osc2cl(osc));
3072         }
3073
3074         RETURN(0);
3075 }
3076 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3077
3078 static int osc_import_event(struct obd_device *obd,
3079                             struct obd_import *imp,
3080                             enum obd_import_event event)
3081 {
3082         struct client_obd *cli;
3083         int rc = 0;
3084
3085         ENTRY;
3086         LASSERT(imp->imp_obd == obd);
3087
3088         switch (event) {
3089         case IMP_EVENT_DISCON: {
3090                 cli = &obd->u.cli;
3091                 spin_lock(&cli->cl_loi_list_lock);
3092                 cli->cl_avail_grant = 0;
3093                 cli->cl_lost_grant = 0;
3094                 spin_unlock(&cli->cl_loi_list_lock);
3095                 break;
3096         }
3097         case IMP_EVENT_INACTIVE: {
3098                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3099                 break;
3100         }
3101         case IMP_EVENT_INVALIDATE: {
3102                 struct ldlm_namespace *ns = obd->obd_namespace;
3103                 struct lu_env         *env;
3104                 __u16                  refcheck;
3105
3106                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3107
3108                 env = cl_env_get(&refcheck);
3109                 if (!IS_ERR(env)) {
3110                         osc_io_unplug(env, &obd->u.cli, NULL);
3111
3112                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3113                                                  osc_ldlm_resource_invalidate,
3114                                                  env, 0);
3115                         cl_env_put(env, &refcheck);
3116
3117                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3118                 } else
3119                         rc = PTR_ERR(env);
3120                 break;
3121         }
3122         case IMP_EVENT_ACTIVE: {
3123                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3124                 break;
3125         }
3126         case IMP_EVENT_OCD: {
3127                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3128
3129                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3130                         osc_init_grant(&obd->u.cli, ocd);
3131
3132                 /* See bug 7198 */
3133                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3134                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3135
3136                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3137                 break;
3138         }
3139         case IMP_EVENT_DEACTIVATE: {
3140                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3141                 break;
3142         }
3143         case IMP_EVENT_ACTIVATE: {
3144                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3145                 break;
3146         }
3147         default:
3148                 CERROR("Unknown import event %d\n", event);
3149                 LBUG();
3150         }
3151         RETURN(rc);
3152 }
3153
3154 /**
3155  * Determine whether the lock can be canceled before replaying the lock
3156  * during recovery, see bug16774 for detailed information.
3157  *
3158  * \retval zero the lock can't be canceled
3159  * \retval other ok to cancel
3160  */
3161 static int osc_cancel_weight(struct ldlm_lock *lock)
3162 {
3163         /*
3164          * Cancel all unused and granted extent lock.
3165          */
3166         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3167             ldlm_is_granted(lock) &&
3168             osc_ldlm_weigh_ast(lock) == 0)
3169                 RETURN(1);
3170
3171         RETURN(0);
3172 }
3173
3174 static int brw_queue_work(const struct lu_env *env, void *data)
3175 {
3176         struct client_obd *cli = data;
3177
3178         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3179
3180         osc_io_unplug(env, cli, NULL);
3181         RETURN(0);
3182 }
3183
3184 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3185 {
3186         struct client_obd *cli = &obd->u.cli;
3187         void *handler;
3188         int rc;
3189
3190         ENTRY;
3191
3192         rc = ptlrpcd_addref();
3193         if (rc)
3194                 RETURN(rc);
3195
3196         rc = client_obd_setup(obd, lcfg);
3197         if (rc)
3198                 GOTO(out_ptlrpcd, rc);
3199
3200
3201         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3202         if (IS_ERR(handler))
3203                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3204         cli->cl_writeback_work = handler;
3205
3206         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3207         if (IS_ERR(handler))
3208                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3209         cli->cl_lru_work = handler;
3210
3211         rc = osc_quota_setup(obd);
3212         if (rc)
3213                 GOTO(out_ptlrpcd_work, rc);
3214
3215         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3216         osc_update_next_shrink(cli);
3217
3218         RETURN(rc);
3219
3220 out_ptlrpcd_work:
3221         if (cli->cl_writeback_work != NULL) {
3222                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3223                 cli->cl_writeback_work = NULL;
3224         }
3225         if (cli->cl_lru_work != NULL) {
3226                 ptlrpcd_destroy_work(cli->cl_lru_work);
3227                 cli->cl_lru_work = NULL;
3228         }
3229         client_obd_cleanup(obd);
3230 out_ptlrpcd:
3231         ptlrpcd_decref();
3232         RETURN(rc);
3233 }
3234 EXPORT_SYMBOL(osc_setup_common);
3235
3236 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3237 {
3238         struct client_obd *cli = &obd->u.cli;
3239         int                adding;
3240         int                added;
3241         int                req_count;
3242         int                rc;
3243
3244         ENTRY;
3245
3246         rc = osc_setup_common(obd, lcfg);
3247         if (rc < 0)
3248                 RETURN(rc);
3249
3250         rc = osc_tunables_init(obd);
3251         if (rc)
3252                 RETURN(rc);
3253
3254         /*
3255          * We try to control the total number of requests with a upper limit
3256          * osc_reqpool_maxreqcount. There might be some race which will cause
3257          * over-limit allocation, but it is fine.
3258          */
3259         req_count = atomic_read(&osc_pool_req_count);
3260         if (req_count < osc_reqpool_maxreqcount) {
3261                 adding = cli->cl_max_rpcs_in_flight + 2;
3262                 if (req_count + adding > osc_reqpool_maxreqcount)
3263                         adding = osc_reqpool_maxreqcount - req_count;
3264
3265                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3266                 atomic_add(added, &osc_pool_req_count);
3267         }
3268
3269         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3270
3271         spin_lock(&osc_shrink_lock);
3272         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3273         spin_unlock(&osc_shrink_lock);
3274         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3275         cli->cl_import->imp_idle_debug = D_HA;
3276
3277         RETURN(0);
3278 }
3279
3280 int osc_precleanup_common(struct obd_device *obd)
3281 {
3282         struct client_obd *cli = &obd->u.cli;
3283         ENTRY;
3284
3285         /* LU-464
3286          * for echo client, export may be on zombie list, wait for
3287          * zombie thread to cull it, because cli.cl_import will be
3288          * cleared in client_disconnect_export():
3289          *   class_export_destroy() -> obd_cleanup() ->
3290          *   echo_device_free() -> echo_client_cleanup() ->
3291          *   obd_disconnect() -> osc_disconnect() ->
3292          *   client_disconnect_export()
3293          */
3294         obd_zombie_barrier();
3295         if (cli->cl_writeback_work) {
3296                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3297                 cli->cl_writeback_work = NULL;
3298         }
3299
3300         if (cli->cl_lru_work) {
3301                 ptlrpcd_destroy_work(cli->cl_lru_work);
3302                 cli->cl_lru_work = NULL;
3303         }
3304
3305         obd_cleanup_client_import(obd);
3306         RETURN(0);
3307 }
3308 EXPORT_SYMBOL(osc_precleanup_common);
3309
3310 static int osc_precleanup(struct obd_device *obd)
3311 {
3312         ENTRY;
3313
3314         osc_precleanup_common(obd);
3315
3316         ptlrpc_lprocfs_unregister_obd(obd);
3317         RETURN(0);
3318 }
3319
3320 int osc_cleanup_common(struct obd_device *obd)
3321 {
3322         struct client_obd *cli = &obd->u.cli;
3323         int rc;
3324
3325         ENTRY;
3326
3327         spin_lock(&osc_shrink_lock);
3328         list_del(&cli->cl_shrink_list);
3329         spin_unlock(&osc_shrink_lock);
3330
3331         /* lru cleanup */
3332         if (cli->cl_cache != NULL) {
3333                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3334                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3335                 list_del_init(&cli->cl_lru_osc);
3336                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3337                 cli->cl_lru_left = NULL;
3338                 cl_cache_decref(cli->cl_cache);
3339                 cli->cl_cache = NULL;
3340         }
3341
3342         /* free memory of osc quota cache */
3343         osc_quota_cleanup(obd);
3344
3345         rc = client_obd_cleanup(obd);
3346
3347         ptlrpcd_decref();
3348         RETURN(rc);
3349 }
3350 EXPORT_SYMBOL(osc_cleanup_common);
3351
3352 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3353 {
3354         ssize_t count  = class_modify_config(lcfg, PARAM_OSC,
3355                                              &obd->obd_kset.kobj);
3356         return count > 0 ? 0 : count;
3357 }
3358
3359 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3360 {
3361         return osc_process_config_base(obd, buf);
3362 }
3363
3364 static struct obd_ops osc_obd_ops = {
3365         .o_owner                = THIS_MODULE,
3366         .o_setup                = osc_setup,
3367         .o_precleanup           = osc_precleanup,
3368         .o_cleanup              = osc_cleanup_common,
3369         .o_add_conn             = client_import_add_conn,
3370         .o_del_conn             = client_import_del_conn,
3371         .o_connect              = client_connect_import,
3372         .o_reconnect            = osc_reconnect,
3373         .o_disconnect           = osc_disconnect,
3374         .o_statfs               = osc_statfs,
3375         .o_statfs_async         = osc_statfs_async,
3376         .o_create               = osc_create,
3377         .o_destroy              = osc_destroy,
3378         .o_getattr              = osc_getattr,
3379         .o_setattr              = osc_setattr,
3380         .o_iocontrol            = osc_iocontrol,
3381         .o_set_info_async       = osc_set_info_async,
3382         .o_import_event         = osc_import_event,
3383         .o_process_config       = osc_process_config,
3384         .o_quotactl             = osc_quotactl,
3385 };
3386
3387 static struct shrinker *osc_cache_shrinker;
3388 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3389 DEFINE_SPINLOCK(osc_shrink_lock);
3390
3391 #ifndef HAVE_SHRINKER_COUNT
3392 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3393 {
3394         struct shrink_control scv = {
3395                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3396                 .gfp_mask   = shrink_param(sc, gfp_mask)
3397         };
3398 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3399         struct shrinker *shrinker = NULL;
3400 #endif
3401
3402         (void)osc_cache_shrink_scan(shrinker, &scv);
3403
3404         return osc_cache_shrink_count(shrinker, &scv);
3405 }
3406 #endif
3407
3408 static int __init osc_init(void)
3409 {
3410         bool enable_proc = true;
3411         struct obd_type *type;
3412         unsigned int reqpool_size;
3413         unsigned int reqsize;
3414         int rc;
3415         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3416                          osc_cache_shrink_count, osc_cache_shrink_scan);
3417         ENTRY;
3418
3419         /* print an address of _any_ initialized kernel symbol from this
3420          * module, to allow debugging with gdb that doesn't support data
3421          * symbols from modules.*/
3422         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3423
3424         rc = lu_kmem_init(osc_caches);
3425         if (rc)
3426                 RETURN(rc);
3427
3428         type = class_search_type(LUSTRE_OSP_NAME);
3429         if (type != NULL && type->typ_procsym != NULL)
3430                 enable_proc = false;
3431
3432         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3433                                  LUSTRE_OSC_NAME, &osc_device_type);
3434         if (rc)
3435                 GOTO(out_kmem, rc);
3436
3437         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3438
3439         /* This is obviously too much memory, only prevent overflow here */
3440         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3441                 GOTO(out_type, rc = -EINVAL);
3442
3443         reqpool_size = osc_reqpool_mem_max << 20;
3444
3445         reqsize = 1;
3446         while (reqsize < OST_IO_MAXREQSIZE)
3447                 reqsize = reqsize << 1;
3448
3449         /*
3450          * We don't enlarge the request count in OSC pool according to
3451          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3452          * tried after normal allocation failed. So a small OSC pool won't
3453          * cause much performance degression in most of cases.
3454          */
3455         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3456
3457         atomic_set(&osc_pool_req_count, 0);
3458         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3459                                           ptlrpc_add_rqs_to_pool);
3460
3461         if (osc_rq_pool == NULL)
3462                 GOTO(out_type, rc = -ENOMEM);
3463
3464         rc = osc_start_grant_work();
3465         if (rc != 0)
3466                 GOTO(out_req_pool, rc);
3467
3468         RETURN(rc);
3469
3470 out_req_pool:
3471         ptlrpc_free_rq_pool(osc_rq_pool);
3472 out_type:
3473         class_unregister_type(LUSTRE_OSC_NAME);
3474 out_kmem:
3475         lu_kmem_fini(osc_caches);
3476
3477         RETURN(rc);
3478 }
3479
3480 static void __exit osc_exit(void)
3481 {
3482         osc_stop_grant_work();
3483         remove_shrinker(osc_cache_shrinker);
3484         class_unregister_type(LUSTRE_OSC_NAME);
3485         lu_kmem_fini(osc_caches);
3486         ptlrpc_free_rq_pool(osc_rq_pool);
3487 }
3488
3489 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3490 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3491 MODULE_VERSION(LUSTRE_VERSION_STRING);
3492 MODULE_LICENSE("GPL");
3493
3494 module_init(osc_init);
3495 module_exit(osc_exit);