Whamcloud - gitweb
cadef123c90b5af33b1ed88e268f7e79fb6c3934
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include <obd.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66         struct obdo              *aa_oa;
67         int                       aa_requested_nob;
68         int                       aa_nio_count;
69         u32                       aa_page_count;
70         int                       aa_resends;
71         struct brw_page **aa_ppga;
72         struct client_obd        *aa_cli;
73         struct list_head          aa_oaps;
74         struct list_head          aa_exts;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_setattr_args {
81         struct obdo             *sa_oa;
82         obd_enqueue_update_f     sa_upcall;
83         void                    *sa_cookie;
84 };
85
86 struct osc_fsync_args {
87         struct osc_object       *fa_obj;
88         struct obdo             *fa_oa;
89         obd_enqueue_update_f    fa_upcall;
90         void                    *fa_cookie;
91 };
92
93 struct osc_enqueue_args {
94         struct obd_export       *oa_exp;
95         enum ldlm_type          oa_type;
96         enum ldlm_mode          oa_mode;
97         __u64                   *oa_flags;
98         osc_enqueue_upcall_f    oa_upcall;
99         void                    *oa_cookie;
100         struct ost_lvb          *oa_lvb;
101         struct lustre_handle    oa_lockh;
102         unsigned int            oa_agl:1;
103 };
104
105 static void osc_release_ppga(struct brw_page **ppga, size_t count);
106 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
107                          void *data, int rc);
108
109 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
110 {
111         struct ost_body *body;
112
113         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
114         LASSERT(body);
115
116         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
117 }
118
119 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
120                        struct obdo *oa)
121 {
122         struct ptlrpc_request   *req;
123         struct ost_body         *body;
124         int                      rc;
125
126         ENTRY;
127         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
128         if (req == NULL)
129                 RETURN(-ENOMEM);
130
131         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
132         if (rc) {
133                 ptlrpc_request_free(req);
134                 RETURN(rc);
135         }
136
137         osc_pack_req_body(req, oa);
138
139         ptlrpc_request_set_replen(req);
140
141         rc = ptlrpc_queue_wait(req);
142         if (rc)
143                 GOTO(out, rc);
144
145         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
146         if (body == NULL)
147                 GOTO(out, rc = -EPROTO);
148
149         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
150         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
151
152         oa->o_blksize = cli_brw_size(exp->exp_obd);
153         oa->o_valid |= OBD_MD_FLBLKSZ;
154
155         EXIT;
156 out:
157         ptlrpc_req_finished(req);
158
159         return rc;
160 }
161
162 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
163                        struct obdo *oa)
164 {
165         struct ptlrpc_request   *req;
166         struct ost_body         *body;
167         int                      rc;
168
169         ENTRY;
170         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
171
172         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
173         if (req == NULL)
174                 RETURN(-ENOMEM);
175
176         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
177         if (rc) {
178                 ptlrpc_request_free(req);
179                 RETURN(rc);
180         }
181
182         osc_pack_req_body(req, oa);
183
184         ptlrpc_request_set_replen(req);
185
186         rc = ptlrpc_queue_wait(req);
187         if (rc)
188                 GOTO(out, rc);
189
190         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191         if (body == NULL)
192                 GOTO(out, rc = -EPROTO);
193
194         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
195
196         EXIT;
197 out:
198         ptlrpc_req_finished(req);
199
200         RETURN(rc);
201 }
202
203 static int osc_setattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_setattr_args *sa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body == NULL)
215                 GOTO(out, rc = -EPROTO);
216
217         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
218                              &body->oa);
219 out:
220         rc = sa->sa_upcall(sa->sa_cookie, rc);
221         RETURN(rc);
222 }
223
224 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
225                       obd_enqueue_update_f upcall, void *cookie,
226                       struct ptlrpc_request_set *rqset)
227 {
228         struct ptlrpc_request   *req;
229         struct osc_setattr_args *sa;
230         int                      rc;
231
232         ENTRY;
233
234         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
235         if (req == NULL)
236                 RETURN(-ENOMEM);
237
238         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
239         if (rc) {
240                 ptlrpc_request_free(req);
241                 RETURN(rc);
242         }
243
244         osc_pack_req_body(req, oa);
245
246         ptlrpc_request_set_replen(req);
247
248         /* do mds to ost setattr asynchronously */
249         if (!rqset) {
250                 /* Do not wait for response. */
251                 ptlrpcd_add_req(req);
252         } else {
253                 req->rq_interpret_reply =
254                         (ptlrpc_interpterer_t)osc_setattr_interpret;
255
256                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
257                 sa = ptlrpc_req_async_args(req);
258                 sa->sa_oa = oa;
259                 sa->sa_upcall = upcall;
260                 sa->sa_cookie = cookie;
261
262                 if (rqset == PTLRPCD_SET)
263                         ptlrpcd_add_req(req);
264                 else
265                         ptlrpc_set_add_req(rqset, req);
266         }
267
268         RETURN(0);
269 }
270
271 static int osc_create(const struct lu_env *env, struct obd_export *exp,
272                       struct obdo *oa)
273 {
274         struct ptlrpc_request *req;
275         struct ost_body       *body;
276         int                    rc;
277         ENTRY;
278
279         LASSERT(oa != NULL);
280         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
281         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
284         if (req == NULL)
285                 GOTO(out, rc = -ENOMEM);
286
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 GOTO(out, rc);
291         }
292
293         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
294         LASSERT(body);
295
296         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
297
298         ptlrpc_request_set_replen(req);
299
300         rc = ptlrpc_queue_wait(req);
301         if (rc)
302                 GOTO(out_req, rc);
303
304         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
305         if (body == NULL)
306                 GOTO(out_req, rc = -EPROTO);
307
308         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
309         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
310
311         oa->o_blksize = cli_brw_size(exp->exp_obd);
312         oa->o_valid |= OBD_MD_FLBLKSZ;
313
314         CDEBUG(D_HA, "transno: "LPD64"\n",
315                lustre_msg_get_transno(req->rq_repmsg));
316 out_req:
317         ptlrpc_req_finished(req);
318 out:
319         RETURN(rc);
320 }
321
322 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
323                    obd_enqueue_update_f upcall, void *cookie,
324                    struct ptlrpc_request_set *rqset)
325 {
326         struct ptlrpc_request   *req;
327         struct osc_setattr_args *sa;
328         struct ost_body         *body;
329         int                      rc;
330         ENTRY;
331
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
333         if (req == NULL)
334                 RETURN(-ENOMEM);
335
336         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
337         if (rc) {
338                 ptlrpc_request_free(req);
339                 RETURN(rc);
340         }
341         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
342         ptlrpc_at_set_req_timeout(req);
343
344         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
345         LASSERT(body);
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
351         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
352         sa = ptlrpc_req_async_args(req);
353         sa->sa_oa = oa;
354         sa->sa_upcall = upcall;
355         sa->sa_cookie = cookie;
356         if (rqset == PTLRPCD_SET)
357                 ptlrpcd_add_req(req);
358         else
359                 ptlrpc_set_add_req(rqset, req);
360
361         RETURN(0);
362 }
363
364 static int osc_sync_interpret(const struct lu_env *env,
365                               struct ptlrpc_request *req,
366                               void *arg, int rc)
367 {
368         struct osc_fsync_args   *fa = arg;
369         struct ost_body         *body;
370         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
371         unsigned long           valid = 0;
372         struct cl_object        *obj;
373         ENTRY;
374
375         if (rc != 0)
376                 GOTO(out, rc);
377
378         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
379         if (body == NULL) {
380                 CERROR("can't unpack ost_body\n");
381                 GOTO(out, rc = -EPROTO);
382         }
383
384         *fa->fa_oa = body->oa;
385         obj = osc2cl(fa->fa_obj);
386
387         /* Update osc object's blocks attribute */
388         cl_object_attr_lock(obj);
389         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
390                 attr->cat_blocks = body->oa.o_blocks;
391                 valid |= CAT_BLOCKS;
392         }
393
394         if (valid != 0)
395                 cl_object_attr_update(env, obj, attr, valid);
396         cl_object_attr_unlock(obj);
397
398 out:
399         rc = fa->fa_upcall(fa->fa_cookie, rc);
400         RETURN(rc);
401 }
402
403 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
404                   obd_enqueue_update_f upcall, void *cookie,
405                   struct ptlrpc_request_set *rqset)
406 {
407         struct obd_export     *exp = osc_export(obj);
408         struct ptlrpc_request *req;
409         struct ost_body       *body;
410         struct osc_fsync_args *fa;
411         int                    rc;
412         ENTRY;
413
414         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
415         if (req == NULL)
416                 RETURN(-ENOMEM);
417
418         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
419         if (rc) {
420                 ptlrpc_request_free(req);
421                 RETURN(rc);
422         }
423
424         /* overload the size and blocks fields in the oa with start/end */
425         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426         LASSERT(body);
427         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
428
429         ptlrpc_request_set_replen(req);
430         req->rq_interpret_reply = osc_sync_interpret;
431
432         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
433         fa = ptlrpc_req_async_args(req);
434         fa->fa_obj = obj;
435         fa->fa_oa = oa;
436         fa->fa_upcall = upcall;
437         fa->fa_cookie = cookie;
438
439         if (rqset == PTLRPCD_SET)
440                 ptlrpcd_add_req(req);
441         else
442                 ptlrpc_set_add_req(rqset, req);
443
444         RETURN (0);
445 }
446
447 /* Find and cancel locally locks matched by @mode in the resource found by
448  * @objid. Found locks are added into @cancel list. Returns the amount of
449  * locks added to @cancels list. */
450 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
451                                    struct list_head *cancels,
452                                    enum ldlm_mode mode, __u64 lock_flags)
453 {
454         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
455         struct ldlm_res_id res_id;
456         struct ldlm_resource *res;
457         int count;
458         ENTRY;
459
460         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
461          * export) but disabled through procfs (flag in NS).
462          *
463          * This distinguishes from a case when ELC is not supported originally,
464          * when we still want to cancel locks in advance and just cancel them
465          * locally, without sending any RPC. */
466         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
467                 RETURN(0);
468
469         ostid_build_res_name(&oa->o_oi, &res_id);
470         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
471         if (IS_ERR(res))
472                 RETURN(0);
473
474         LDLM_RESOURCE_ADDREF(res);
475         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
476                                            lock_flags, 0, NULL);
477         LDLM_RESOURCE_DELREF(res);
478         ldlm_resource_putref(res);
479         RETURN(count);
480 }
481
482 static int osc_destroy_interpret(const struct lu_env *env,
483                                  struct ptlrpc_request *req, void *data,
484                                  int rc)
485 {
486         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
487
488         atomic_dec(&cli->cl_destroy_in_flight);
489         wake_up(&cli->cl_destroy_waitq);
490         return 0;
491 }
492
493 static int osc_can_send_destroy(struct client_obd *cli)
494 {
495         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
496             cli->cl_max_rpcs_in_flight) {
497                 /* The destroy request can be sent */
498                 return 1;
499         }
500         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
501             cli->cl_max_rpcs_in_flight) {
502                 /*
503                  * The counter has been modified between the two atomic
504                  * operations.
505                  */
506                 wake_up(&cli->cl_destroy_waitq);
507         }
508         return 0;
509 }
510
511 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
512                        struct obdo *oa)
513 {
514         struct client_obd     *cli = &exp->exp_obd->u.cli;
515         struct ptlrpc_request *req;
516         struct ost_body       *body;
517         struct list_head       cancels = LIST_HEAD_INIT(cancels);
518         int rc, count;
519         ENTRY;
520
521         if (!oa) {
522                 CDEBUG(D_INFO, "oa NULL\n");
523                 RETURN(-EINVAL);
524         }
525
526         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
527                                         LDLM_FL_DISCARD_DATA);
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
530         if (req == NULL) {
531                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
532                 RETURN(-ENOMEM);
533         }
534
535         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
536                                0, &cancels, count);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 RETURN(rc);
540         }
541
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544
545         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546         LASSERT(body);
547         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548
549         ptlrpc_request_set_replen(req);
550
551         req->rq_interpret_reply = osc_destroy_interpret;
552         if (!osc_can_send_destroy(cli)) {
553                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
554
555                 /*
556                  * Wait until the number of on-going destroy RPCs drops
557                  * under max_rpc_in_flight
558                  */
559                 l_wait_event_exclusive(cli->cl_destroy_waitq,
560                                        osc_can_send_destroy(cli), &lwi);
561         }
562
563         /* Do not wait for response */
564         ptlrpcd_add_req(req);
565         RETURN(0);
566 }
567
568 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
569                                 long writing_bytes)
570 {
571         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
572
573         LASSERT(!(oa->o_valid & bits));
574
575         oa->o_valid |= bits;
576         spin_lock(&cli->cl_loi_list_lock);
577         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
578         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
579                      cli->cl_dirty_max_pages)) {
580                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
581                        cli->cl_dirty_pages, cli->cl_dirty_transit,
582                        cli->cl_dirty_max_pages);
583                 oa->o_undirty = 0;
584         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
585                             atomic_long_read(&obd_dirty_transit_pages) >
586                             (long)(obd_max_dirty_pages + 1))) {
587                 /* The atomic_read() allowing the atomic_inc() are
588                  * not covered by a lock thus they may safely race and trip
589                  * this CERROR() unless we add in a small fudge factor (+1). */
590                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
591                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
592                        atomic_long_read(&obd_dirty_transit_pages),
593                        obd_max_dirty_pages);
594                 oa->o_undirty = 0;
595         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
596                             0x7fffffff)) {
597                 CERROR("dirty %lu - dirty_max %lu too big???\n",
598                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
599                 oa->o_undirty = 0;
600         } else {
601                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
602                                       PAGE_CACHE_SHIFT) *
603                                      (cli->cl_max_rpcs_in_flight + 1);
604                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
605                                     max_in_flight);
606         }
607         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
608         oa->o_dropped = cli->cl_lost_grant;
609         cli->cl_lost_grant = 0;
610         spin_unlock(&cli->cl_loi_list_lock);
611         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
612                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
613
614 }
615
616 void osc_update_next_shrink(struct client_obd *cli)
617 {
618         cli->cl_next_shrink_grant =
619                 cfs_time_shift(cli->cl_grant_shrink_interval);
620         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
621                cli->cl_next_shrink_grant);
622 }
623
624 static void __osc_update_grant(struct client_obd *cli, u64 grant)
625 {
626         spin_lock(&cli->cl_loi_list_lock);
627         cli->cl_avail_grant += grant;
628         spin_unlock(&cli->cl_loi_list_lock);
629 }
630
631 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
632 {
633         if (body->oa.o_valid & OBD_MD_FLGRANT) {
634                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
635                 __osc_update_grant(cli, body->oa.o_grant);
636         }
637 }
638
639 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
640                               u32 keylen, void *key,
641                               u32 vallen, void *val,
642                               struct ptlrpc_request_set *set);
643
644 static int osc_shrink_grant_interpret(const struct lu_env *env,
645                                       struct ptlrpc_request *req,
646                                       void *aa, int rc)
647 {
648         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
649         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
650         struct ost_body *body;
651
652         if (rc != 0) {
653                 __osc_update_grant(cli, oa->o_grant);
654                 GOTO(out, rc);
655         }
656
657         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
658         LASSERT(body);
659         osc_update_grant(cli, body);
660 out:
661         OBDO_FREE(oa);
662         return rc;
663 }
664
665 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
666 {
667         spin_lock(&cli->cl_loi_list_lock);
668         oa->o_grant = cli->cl_avail_grant / 4;
669         cli->cl_avail_grant -= oa->o_grant;
670         spin_unlock(&cli->cl_loi_list_lock);
671         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
672                 oa->o_valid |= OBD_MD_FLFLAGS;
673                 oa->o_flags = 0;
674         }
675         oa->o_flags |= OBD_FL_SHRINK_GRANT;
676         osc_update_next_shrink(cli);
677 }
678
679 /* Shrink the current grant, either from some large amount to enough for a
680  * full set of in-flight RPCs, or if we have already shrunk to that limit
681  * then to enough for a single RPC.  This avoids keeping more grant than
682  * needed, and avoids shrinking the grant piecemeal. */
683 static int osc_shrink_grant(struct client_obd *cli)
684 {
685         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
686                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
687
688         spin_lock(&cli->cl_loi_list_lock);
689         if (cli->cl_avail_grant <= target_bytes)
690                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
691         spin_unlock(&cli->cl_loi_list_lock);
692
693         return osc_shrink_grant_to_target(cli, target_bytes);
694 }
695
696 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
697 {
698         int                     rc = 0;
699         struct ost_body        *body;
700         ENTRY;
701
702         spin_lock(&cli->cl_loi_list_lock);
703         /* Don't shrink if we are already above or below the desired limit
704          * We don't want to shrink below a single RPC, as that will negatively
705          * impact block allocation and long-term performance. */
706         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
707                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
708
709         if (target_bytes >= cli->cl_avail_grant) {
710                 spin_unlock(&cli->cl_loi_list_lock);
711                 RETURN(0);
712         }
713         spin_unlock(&cli->cl_loi_list_lock);
714
715         OBD_ALLOC_PTR(body);
716         if (!body)
717                 RETURN(-ENOMEM);
718
719         osc_announce_cached(cli, &body->oa, 0);
720
721         spin_lock(&cli->cl_loi_list_lock);
722         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
723         cli->cl_avail_grant = target_bytes;
724         spin_unlock(&cli->cl_loi_list_lock);
725         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
726                 body->oa.o_valid |= OBD_MD_FLFLAGS;
727                 body->oa.o_flags = 0;
728         }
729         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
730         osc_update_next_shrink(cli);
731
732         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
733                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
734                                 sizeof(*body), body, NULL);
735         if (rc != 0)
736                 __osc_update_grant(cli, body->oa.o_grant);
737         OBD_FREE_PTR(body);
738         RETURN(rc);
739 }
740
741 static int osc_should_shrink_grant(struct client_obd *client)
742 {
743         cfs_time_t time = cfs_time_current();
744         cfs_time_t next_shrink = client->cl_next_shrink_grant;
745
746         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
747              OBD_CONNECT_GRANT_SHRINK) == 0)
748                 return 0;
749
750         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
751                 /* Get the current RPC size directly, instead of going via:
752                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
753                  * Keep comment here so that it can be found by searching. */
754                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
755
756                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
757                     client->cl_avail_grant > brw_size)
758                         return 1;
759                 else
760                         osc_update_next_shrink(client);
761         }
762         return 0;
763 }
764
765 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
766 {
767         struct client_obd *client;
768
769         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
770                 if (osc_should_shrink_grant(client))
771                         osc_shrink_grant(client);
772         }
773         return 0;
774 }
775
776 static int osc_add_shrink_grant(struct client_obd *client)
777 {
778         int rc;
779
780         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
781                                        TIMEOUT_GRANT,
782                                        osc_grant_shrink_grant_cb, NULL,
783                                        &client->cl_grant_shrink_list);
784         if (rc) {
785                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
786                 return rc;
787         }
788         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
789         osc_update_next_shrink(client);
790         return 0;
791 }
792
793 static int osc_del_shrink_grant(struct client_obd *client)
794 {
795         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
796                                          TIMEOUT_GRANT);
797 }
798
799 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
800 {
801         /*
802          * ocd_grant is the total grant amount we're expect to hold: if we've
803          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
804          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
805          * dirty.
806          *
807          * race is tolerable here: if we're evicted, but imp_state already
808          * left EVICTED state, then cl_dirty_pages must be 0 already.
809          */
810         spin_lock(&cli->cl_loi_list_lock);
811         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
812                 cli->cl_avail_grant = ocd->ocd_grant;
813         else
814                 cli->cl_avail_grant = ocd->ocd_grant -
815                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
816
817         if (cli->cl_avail_grant < 0) {
818                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
819                       cli_name(cli), cli->cl_avail_grant,
820                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
821                 /* workaround for servers which do not have the patch from
822                  * LU-2679 */
823                 cli->cl_avail_grant = ocd->ocd_grant;
824         }
825
826         /* determine the appropriate chunk size used by osc_extent. */
827         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
828         spin_unlock(&cli->cl_loi_list_lock);
829
830         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
831                "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
832                cli->cl_lost_grant, cli->cl_chunkbits);
833
834         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
835             list_empty(&cli->cl_grant_shrink_list))
836                 osc_add_shrink_grant(cli);
837 }
838
839 /* We assume that the reason this OSC got a short read is because it read
840  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
841  * via the LOV, and it _knows_ it's reading inside the file, it's just that
842  * this stripe never got written at or beyond this stripe offset yet. */
843 static void handle_short_read(int nob_read, size_t page_count,
844                               struct brw_page **pga)
845 {
846         char *ptr;
847         int i = 0;
848
849         /* skip bytes read OK */
850         while (nob_read > 0) {
851                 LASSERT (page_count > 0);
852
853                 if (pga[i]->count > nob_read) {
854                         /* EOF inside this page */
855                         ptr = kmap(pga[i]->pg) +
856                                 (pga[i]->off & ~PAGE_MASK);
857                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
858                         kunmap(pga[i]->pg);
859                         page_count--;
860                         i++;
861                         break;
862                 }
863
864                 nob_read -= pga[i]->count;
865                 page_count--;
866                 i++;
867         }
868
869         /* zero remaining pages */
870         while (page_count-- > 0) {
871                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
872                 memset(ptr, 0, pga[i]->count);
873                 kunmap(pga[i]->pg);
874                 i++;
875         }
876 }
877
878 static int check_write_rcs(struct ptlrpc_request *req,
879                            int requested_nob, int niocount,
880                            size_t page_count, struct brw_page **pga)
881 {
882         int     i;
883         __u32   *remote_rcs;
884
885         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
886                                                   sizeof(*remote_rcs) *
887                                                   niocount);
888         if (remote_rcs == NULL) {
889                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
890                 return(-EPROTO);
891         }
892
893         /* return error if any niobuf was in error */
894         for (i = 0; i < niocount; i++) {
895                 if ((int)remote_rcs[i] < 0)
896                         return(remote_rcs[i]);
897
898                 if (remote_rcs[i] != 0) {
899                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
900                                 i, remote_rcs[i], req);
901                         return(-EPROTO);
902                 }
903         }
904
905         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
906                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
907                        req->rq_bulk->bd_nob_transferred, requested_nob);
908                 return(-EPROTO);
909         }
910
911         return (0);
912 }
913
914 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
915 {
916         if (p1->flag != p2->flag) {
917                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
918                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
919                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
920
921                 /* warn if we try to combine flags that we don't know to be
922                  * safe to combine */
923                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
924                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
925                               "report this at https://jira.hpdd.intel.com/\n",
926                               p1->flag, p2->flag);
927                 }
928                 return 0;
929         }
930
931         return (p1->off + p1->count == p2->off);
932 }
933
934 static u32 osc_checksum_bulk(int nob, size_t pg_count,
935                              struct brw_page **pga, int opc,
936                              cksum_type_t cksum_type)
937 {
938         u32                             cksum;
939         int                             i = 0;
940         struct cfs_crypto_hash_desc     *hdesc;
941         unsigned int                    bufsize;
942         int                             err;
943         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
944
945         LASSERT(pg_count > 0);
946
947         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
948         if (IS_ERR(hdesc)) {
949                 CERROR("Unable to initialize checksum hash %s\n",
950                        cfs_crypto_hash_name(cfs_alg));
951                 return PTR_ERR(hdesc);
952         }
953
954         while (nob > 0 && pg_count > 0) {
955                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
956
957                 /* corrupt the data before we compute the checksum, to
958                  * simulate an OST->client data error */
959                 if (i == 0 && opc == OST_READ &&
960                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
961                         unsigned char *ptr = kmap(pga[i]->pg);
962                         int off = pga[i]->off & ~PAGE_MASK;
963
964                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
965                         kunmap(pga[i]->pg);
966                 }
967                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
968                                             pga[i]->off & ~PAGE_MASK,
969                                             count);
970                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
971                                (int)(pga[i]->off & ~PAGE_MASK));
972
973                 nob -= pga[i]->count;
974                 pg_count--;
975                 i++;
976         }
977
978         bufsize = sizeof(cksum);
979         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
980
981         /* For sending we only compute the wrong checksum instead
982          * of corrupting the data so it is still correct on a redo */
983         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
984                 cksum++;
985
986         return cksum;
987 }
988
989 static int
990 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
991                      u32 page_count, struct brw_page **pga,
992                      struct ptlrpc_request **reqp, int resend)
993 {
994         struct ptlrpc_request   *req;
995         struct ptlrpc_bulk_desc *desc;
996         struct ost_body         *body;
997         struct obd_ioobj        *ioobj;
998         struct niobuf_remote    *niobuf;
999         int niocount, i, requested_nob, opc, rc;
1000         struct osc_brw_async_args *aa;
1001         struct req_capsule      *pill;
1002         struct brw_page *pg_prev;
1003
1004         ENTRY;
1005         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1006                 RETURN(-ENOMEM); /* Recoverable */
1007         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1008                 RETURN(-EINVAL); /* Fatal */
1009
1010         if ((cmd & OBD_BRW_WRITE) != 0) {
1011                 opc = OST_WRITE;
1012                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1013                                                 osc_rq_pool,
1014                                                 &RQF_OST_BRW_WRITE);
1015         } else {
1016                 opc = OST_READ;
1017                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1018         }
1019         if (req == NULL)
1020                 RETURN(-ENOMEM);
1021
1022         for (niocount = i = 1; i < page_count; i++) {
1023                 if (!can_merge_pages(pga[i - 1], pga[i]))
1024                         niocount++;
1025         }
1026
1027         pill = &req->rq_pill;
1028         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1029                              sizeof(*ioobj));
1030         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1031                              niocount * sizeof(*niobuf));
1032
1033         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1034         if (rc) {
1035                 ptlrpc_request_free(req);
1036                 RETURN(rc);
1037         }
1038         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1039         ptlrpc_at_set_req_timeout(req);
1040         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1041          * retry logic */
1042         req->rq_no_retry_einprogress = 1;
1043
1044         desc = ptlrpc_prep_bulk_imp(req, page_count,
1045                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1046                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1047                         PTLRPC_BULK_PUT_SINK) |
1048                         PTLRPC_BULK_BUF_KIOV,
1049                 OST_BULK_PORTAL,
1050                 &ptlrpc_bulk_kiov_pin_ops);
1051
1052         if (desc == NULL)
1053                 GOTO(out, rc = -ENOMEM);
1054         /* NB request now owns desc and will free it when it gets freed */
1055
1056         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1057         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1058         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1059         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1060
1061         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1062
1063         obdo_to_ioobj(oa, ioobj);
1064         ioobj->ioo_bufcnt = niocount;
1065         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1066          * that might be send for this request.  The actual number is decided
1067          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1068          * "max - 1" for old client compatibility sending "0", and also so the
1069          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1070         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1071         LASSERT(page_count > 0);
1072         pg_prev = pga[0];
1073         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1074                 struct brw_page *pg = pga[i];
1075                 int poff = pg->off & ~PAGE_MASK;
1076
1077                 LASSERT(pg->count > 0);
1078                 /* make sure there is no gap in the middle of page array */
1079                 LASSERTF(page_count == 1 ||
1080                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1081                           ergo(i > 0 && i < page_count - 1,
1082                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1083                           ergo(i == page_count - 1, poff == 0)),
1084                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1085                          i, page_count, pg, pg->off, pg->count);
1086                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1087                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1088                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1089                          i, page_count,
1090                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1091                          pg_prev->pg, page_private(pg_prev->pg),
1092                          pg_prev->pg->index, pg_prev->off);
1093                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1094                         (pg->flag & OBD_BRW_SRVLOCK));
1095
1096                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1097                 requested_nob += pg->count;
1098
1099                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1100                         niobuf--;
1101                         niobuf->rnb_len += pg->count;
1102                 } else {
1103                         niobuf->rnb_offset = pg->off;
1104                         niobuf->rnb_len    = pg->count;
1105                         niobuf->rnb_flags  = pg->flag;
1106                 }
1107                 pg_prev = pg;
1108         }
1109
1110         LASSERTF((void *)(niobuf - niocount) ==
1111                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1112                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1113                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1114
1115         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1116         if (resend) {
1117                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1118                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1119                         body->oa.o_flags = 0;
1120                 }
1121                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1122         }
1123
1124         if (osc_should_shrink_grant(cli))
1125                 osc_shrink_grant_local(cli, &body->oa);
1126
1127         /* size[REQ_REC_OFF] still sizeof (*body) */
1128         if (opc == OST_WRITE) {
1129                 if (cli->cl_checksum &&
1130                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1131                         /* store cl_cksum_type in a local variable since
1132                          * it can be changed via lprocfs */
1133                         cksum_type_t cksum_type = cli->cl_cksum_type;
1134
1135                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1136                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1137                                 body->oa.o_flags = 0;
1138                         }
1139                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1140                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1141                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1142                                                              page_count, pga,
1143                                                              OST_WRITE,
1144                                                              cksum_type);
1145                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1146                                body->oa.o_cksum);
1147                         /* save this in 'oa', too, for later checking */
1148                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1149                         oa->o_flags |= cksum_type_pack(cksum_type);
1150                 } else {
1151                         /* clear out the checksum flag, in case this is a
1152                          * resend but cl_checksum is no longer set. b=11238 */
1153                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1154                 }
1155                 oa->o_cksum = body->oa.o_cksum;
1156                 /* 1 RC per niobuf */
1157                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1158                                      sizeof(__u32) * niocount);
1159         } else {
1160                 if (cli->cl_checksum &&
1161                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1162                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1163                                 body->oa.o_flags = 0;
1164                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1165                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1166                 }
1167         }
1168         ptlrpc_request_set_replen(req);
1169
1170         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1171         aa = ptlrpc_req_async_args(req);
1172         aa->aa_oa = oa;
1173         aa->aa_requested_nob = requested_nob;
1174         aa->aa_nio_count = niocount;
1175         aa->aa_page_count = page_count;
1176         aa->aa_resends = 0;
1177         aa->aa_ppga = pga;
1178         aa->aa_cli = cli;
1179         INIT_LIST_HEAD(&aa->aa_oaps);
1180
1181         *reqp = req;
1182         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1183         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1184                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1185                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1186         RETURN(0);
1187
1188  out:
1189         ptlrpc_req_finished(req);
1190         RETURN(rc);
1191 }
1192
1193 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1194                                 __u32 client_cksum, __u32 server_cksum, int nob,
1195                                 size_t page_count, struct brw_page **pga,
1196                                 cksum_type_t client_cksum_type)
1197 {
1198         __u32 new_cksum;
1199         char *msg;
1200         cksum_type_t cksum_type;
1201
1202         if (server_cksum == client_cksum) {
1203                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1204                 return 0;
1205         }
1206
1207         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1208                                        oa->o_flags : 0);
1209         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1210                                       cksum_type);
1211
1212         if (cksum_type != client_cksum_type)
1213                 msg = "the server did not use the checksum type specified in "
1214                       "the original request - likely a protocol problem";
1215         else if (new_cksum == server_cksum)
1216                 msg = "changed on the client after we checksummed it - "
1217                       "likely false positive due to mmap IO (bug 11742)";
1218         else if (new_cksum == client_cksum)
1219                 msg = "changed in transit before arrival at OST";
1220         else
1221                 msg = "changed in transit AND doesn't match the original - "
1222                       "likely false positive due to mmap IO (bug 11742)";
1223
1224         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1225                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1226                            msg, libcfs_nid2str(peer->nid),
1227                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1228                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1229                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1230                            POSTID(&oa->o_oi), pga[0]->off,
1231                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1232         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1233                "client csum now %x\n", client_cksum, client_cksum_type,
1234                server_cksum, cksum_type, new_cksum);
1235         return 1;
1236 }
1237
1238 /* Note rc enters this function as number of bytes transferred */
1239 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1240 {
1241         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1242         const lnet_process_id_t *peer =
1243                         &req->rq_import->imp_connection->c_peer;
1244         struct client_obd *cli = aa->aa_cli;
1245         struct ost_body *body;
1246         u32 client_cksum = 0;
1247         ENTRY;
1248
1249         if (rc < 0 && rc != -EDQUOT) {
1250                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1251                 RETURN(rc);
1252         }
1253
1254         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1255         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1256         if (body == NULL) {
1257                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1258                 RETURN(-EPROTO);
1259         }
1260
1261         /* set/clear over quota flag for a uid/gid */
1262         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1263             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1264                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1265
1266                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1267                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1268                        body->oa.o_flags);
1269                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1270         }
1271
1272         osc_update_grant(cli, body);
1273
1274         if (rc < 0)
1275                 RETURN(rc);
1276
1277         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1278                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1279
1280         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1281                 if (rc > 0) {
1282                         CERROR("Unexpected +ve rc %d\n", rc);
1283                         RETURN(-EPROTO);
1284                 }
1285                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1286
1287                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1288                         RETURN(-EAGAIN);
1289
1290                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1291                     check_write_checksum(&body->oa, peer, client_cksum,
1292                                          body->oa.o_cksum, aa->aa_requested_nob,
1293                                          aa->aa_page_count, aa->aa_ppga,
1294                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1295                         RETURN(-EAGAIN);
1296
1297                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1298                                      aa->aa_page_count, aa->aa_ppga);
1299                 GOTO(out, rc);
1300         }
1301
1302         /* The rest of this function executes only for OST_READs */
1303
1304         /* if unwrap_bulk failed, return -EAGAIN to retry */
1305         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1306         if (rc < 0)
1307                 GOTO(out, rc = -EAGAIN);
1308
1309         if (rc > aa->aa_requested_nob) {
1310                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1311                        aa->aa_requested_nob);
1312                 RETURN(-EPROTO);
1313         }
1314
1315         if (rc != req->rq_bulk->bd_nob_transferred) {
1316                 CERROR ("Unexpected rc %d (%d transferred)\n",
1317                         rc, req->rq_bulk->bd_nob_transferred);
1318                 return (-EPROTO);
1319         }
1320
1321         if (rc < aa->aa_requested_nob)
1322                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1323
1324         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1325                 static int cksum_counter;
1326                 u32        server_cksum = body->oa.o_cksum;
1327                 char      *via = "";
1328                 char      *router = "";
1329                 cksum_type_t cksum_type;
1330
1331                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1332                                                body->oa.o_flags : 0);
1333                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1334                                                  aa->aa_ppga, OST_READ,
1335                                                  cksum_type);
1336
1337                 if (peer->nid != req->rq_bulk->bd_sender) {
1338                         via = " via ";
1339                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1340                 }
1341
1342                 if (server_cksum != client_cksum) {
1343                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1344                                            "%s%s%s inode "DFID" object "DOSTID
1345                                            " extent ["LPU64"-"LPU64"]\n",
1346                                            req->rq_import->imp_obd->obd_name,
1347                                            libcfs_nid2str(peer->nid),
1348                                            via, router,
1349                                            body->oa.o_valid & OBD_MD_FLFID ?
1350                                                 body->oa.o_parent_seq : (__u64)0,
1351                                            body->oa.o_valid & OBD_MD_FLFID ?
1352                                                 body->oa.o_parent_oid : 0,
1353                                            body->oa.o_valid & OBD_MD_FLFID ?
1354                                                 body->oa.o_parent_ver : 0,
1355                                            POSTID(&body->oa.o_oi),
1356                                            aa->aa_ppga[0]->off,
1357                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1358                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1359                                                                         1);
1360                         CERROR("client %x, server %x, cksum_type %x\n",
1361                                client_cksum, server_cksum, cksum_type);
1362                         cksum_counter = 0;
1363                         aa->aa_oa->o_cksum = client_cksum;
1364                         rc = -EAGAIN;
1365                 } else {
1366                         cksum_counter++;
1367                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1368                         rc = 0;
1369                 }
1370         } else if (unlikely(client_cksum)) {
1371                 static int cksum_missed;
1372
1373                 cksum_missed++;
1374                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1375                         CERROR("Checksum %u requested from %s but not sent\n",
1376                                cksum_missed, libcfs_nid2str(peer->nid));
1377         } else {
1378                 rc = 0;
1379         }
1380 out:
1381         if (rc >= 0)
1382                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1383                                      aa->aa_oa, &body->oa);
1384
1385         RETURN(rc);
1386 }
1387
1388 static int osc_brw_redo_request(struct ptlrpc_request *request,
1389                                 struct osc_brw_async_args *aa, int rc)
1390 {
1391         struct ptlrpc_request *new_req;
1392         struct osc_brw_async_args *new_aa;
1393         struct osc_async_page *oap;
1394         ENTRY;
1395
1396         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1397                   "redo for recoverable error %d", rc);
1398
1399         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1400                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1401                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1402                                   aa->aa_ppga, &new_req, 1);
1403         if (rc)
1404                 RETURN(rc);
1405
1406         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1407                 if (oap->oap_request != NULL) {
1408                         LASSERTF(request == oap->oap_request,
1409                                  "request %p != oap_request %p\n",
1410                                  request, oap->oap_request);
1411                         if (oap->oap_interrupted) {
1412                                 ptlrpc_req_finished(new_req);
1413                                 RETURN(-EINTR);
1414                         }
1415                 }
1416         }
1417         /* New request takes over pga and oaps from old request.
1418          * Note that copying a list_head doesn't work, need to move it... */
1419         aa->aa_resends++;
1420         new_req->rq_interpret_reply = request->rq_interpret_reply;
1421         new_req->rq_async_args = request->rq_async_args;
1422         new_req->rq_commit_cb = request->rq_commit_cb;
1423         /* cap resend delay to the current request timeout, this is similar to
1424          * what ptlrpc does (see after_reply()) */
1425         if (aa->aa_resends > new_req->rq_timeout)
1426                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1427         else
1428                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1429         new_req->rq_generation_set = 1;
1430         new_req->rq_import_generation = request->rq_import_generation;
1431
1432         new_aa = ptlrpc_req_async_args(new_req);
1433
1434         INIT_LIST_HEAD(&new_aa->aa_oaps);
1435         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1436         INIT_LIST_HEAD(&new_aa->aa_exts);
1437         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1438         new_aa->aa_resends = aa->aa_resends;
1439
1440         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1441                 if (oap->oap_request) {
1442                         ptlrpc_req_finished(oap->oap_request);
1443                         oap->oap_request = ptlrpc_request_addref(new_req);
1444                 }
1445         }
1446
1447         /* XXX: This code will run into problem if we're going to support
1448          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1449          * and wait for all of them to be finished. We should inherit request
1450          * set from old request. */
1451         ptlrpcd_add_req(new_req);
1452
1453         DEBUG_REQ(D_INFO, new_req, "new request");
1454         RETURN(0);
1455 }
1456
1457 /*
1458  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1459  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1460  * fine for our small page arrays and doesn't require allocation.  its an
1461  * insertion sort that swaps elements that are strides apart, shrinking the
1462  * stride down until its '1' and the array is sorted.
1463  */
1464 static void sort_brw_pages(struct brw_page **array, int num)
1465 {
1466         int stride, i, j;
1467         struct brw_page *tmp;
1468
1469         if (num == 1)
1470                 return;
1471         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1472                 ;
1473
1474         do {
1475                 stride /= 3;
1476                 for (i = stride ; i < num ; i++) {
1477                         tmp = array[i];
1478                         j = i;
1479                         while (j >= stride && array[j - stride]->off > tmp->off) {
1480                                 array[j] = array[j - stride];
1481                                 j -= stride;
1482                         }
1483                         array[j] = tmp;
1484                 }
1485         } while (stride > 1);
1486 }
1487
1488 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1489 {
1490         LASSERT(ppga != NULL);
1491         OBD_FREE(ppga, sizeof(*ppga) * count);
1492 }
1493
1494 static int brw_interpret(const struct lu_env *env,
1495                          struct ptlrpc_request *req, void *data, int rc)
1496 {
1497         struct osc_brw_async_args *aa = data;
1498         struct osc_extent *ext;
1499         struct osc_extent *tmp;
1500         struct client_obd *cli = aa->aa_cli;
1501         ENTRY;
1502
1503         rc = osc_brw_fini_request(req, rc);
1504         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1505         /* When server return -EINPROGRESS, client should always retry
1506          * regardless of the number of times the bulk was resent already. */
1507         if (osc_recoverable_error(rc)) {
1508                 if (req->rq_import_generation !=
1509                     req->rq_import->imp_generation) {
1510                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1511                                ""DOSTID", rc = %d.\n",
1512                                req->rq_import->imp_obd->obd_name,
1513                                POSTID(&aa->aa_oa->o_oi), rc);
1514                 } else if (rc == -EINPROGRESS ||
1515                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1516                         rc = osc_brw_redo_request(req, aa, rc);
1517                 } else {
1518                         CERROR("%s: too many resent retries for object: "
1519                                ""LPU64":"LPU64", rc = %d.\n",
1520                                req->rq_import->imp_obd->obd_name,
1521                                POSTID(&aa->aa_oa->o_oi), rc);
1522                 }
1523
1524                 if (rc == 0)
1525                         RETURN(0);
1526                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1527                         rc = -EIO;
1528         }
1529
1530         if (rc == 0) {
1531                 struct obdo *oa = aa->aa_oa;
1532                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1533                 unsigned long valid = 0;
1534                 struct cl_object *obj;
1535                 struct osc_async_page *last;
1536
1537                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1538                 obj = osc2cl(last->oap_obj);
1539
1540                 cl_object_attr_lock(obj);
1541                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1542                         attr->cat_blocks = oa->o_blocks;
1543                         valid |= CAT_BLOCKS;
1544                 }
1545                 if (oa->o_valid & OBD_MD_FLMTIME) {
1546                         attr->cat_mtime = oa->o_mtime;
1547                         valid |= CAT_MTIME;
1548                 }
1549                 if (oa->o_valid & OBD_MD_FLATIME) {
1550                         attr->cat_atime = oa->o_atime;
1551                         valid |= CAT_ATIME;
1552                 }
1553                 if (oa->o_valid & OBD_MD_FLCTIME) {
1554                         attr->cat_ctime = oa->o_ctime;
1555                         valid |= CAT_CTIME;
1556                 }
1557
1558                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1559                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1560                         loff_t last_off = last->oap_count + last->oap_obj_off +
1561                                 last->oap_page_off;
1562
1563                         /* Change file size if this is an out of quota or
1564                          * direct IO write and it extends the file size */
1565                         if (loi->loi_lvb.lvb_size < last_off) {
1566                                 attr->cat_size = last_off;
1567                                 valid |= CAT_SIZE;
1568                         }
1569                         /* Extend KMS if it's not a lockless write */
1570                         if (loi->loi_kms < last_off &&
1571                             oap2osc_page(last)->ops_srvlock == 0) {
1572                                 attr->cat_kms = last_off;
1573                                 valid |= CAT_KMS;
1574                         }
1575                 }
1576
1577                 if (valid != 0)
1578                         cl_object_attr_update(env, obj, attr, valid);
1579                 cl_object_attr_unlock(obj);
1580         }
1581         OBDO_FREE(aa->aa_oa);
1582
1583         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1584                 osc_inc_unstable_pages(req);
1585
1586         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1587                 list_del_init(&ext->oe_link);
1588                 osc_extent_finish(env, ext, 1, rc);
1589         }
1590         LASSERT(list_empty(&aa->aa_exts));
1591         LASSERT(list_empty(&aa->aa_oaps));
1592
1593         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1594                           req->rq_bulk->bd_nob_transferred);
1595         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1596         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1597
1598         spin_lock(&cli->cl_loi_list_lock);
1599         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1600          * is called so we know whether to go to sync BRWs or wait for more
1601          * RPCs to complete */
1602         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1603                 cli->cl_w_in_flight--;
1604         else
1605                 cli->cl_r_in_flight--;
1606         osc_wake_cache_waiters(cli);
1607         spin_unlock(&cli->cl_loi_list_lock);
1608
1609         osc_io_unplug(env, cli, NULL);
1610         RETURN(rc);
1611 }
1612
1613 static void brw_commit(struct ptlrpc_request *req)
1614 {
1615         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1616          * this called via the rq_commit_cb, I need to ensure
1617          * osc_dec_unstable_pages is still called. Otherwise unstable
1618          * pages may be leaked. */
1619         spin_lock(&req->rq_lock);
1620         if (likely(req->rq_unstable)) {
1621                 req->rq_unstable = 0;
1622                 spin_unlock(&req->rq_lock);
1623
1624                 osc_dec_unstable_pages(req);
1625         } else {
1626                 req->rq_committed = 1;
1627                 spin_unlock(&req->rq_lock);
1628         }
1629 }
1630
1631 /**
1632  * Build an RPC by the list of extent @ext_list. The caller must ensure
1633  * that the total pages in this list are NOT over max pages per RPC.
1634  * Extents in the list must be in OES_RPC state.
1635  */
1636 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1637                   struct list_head *ext_list, int cmd)
1638 {
1639         struct ptlrpc_request           *req = NULL;
1640         struct osc_extent               *ext;
1641         struct brw_page                 **pga = NULL;
1642         struct osc_brw_async_args       *aa = NULL;
1643         struct obdo                     *oa = NULL;
1644         struct osc_async_page           *oap;
1645         struct osc_async_page           *tmp;
1646         struct cl_req                   *clerq = NULL;
1647         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1648                                                                       CRT_READ;
1649         struct cl_req_attr              *crattr = NULL;
1650         loff_t                          starting_offset = OBD_OBJECT_EOF;
1651         loff_t                          ending_offset = 0;
1652         int                             mpflag = 0;
1653         int                             mem_tight = 0;
1654         int                             page_count = 0;
1655         bool                            soft_sync = false;
1656         int                             i;
1657         int                             rc;
1658         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1659         struct ost_body                 *body;
1660         ENTRY;
1661         LASSERT(!list_empty(ext_list));
1662
1663         /* add pages into rpc_list to build BRW rpc */
1664         list_for_each_entry(ext, ext_list, oe_link) {
1665                 LASSERT(ext->oe_state == OES_RPC);
1666                 mem_tight |= ext->oe_memalloc;
1667                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1668                         ++page_count;
1669                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1670                         if (starting_offset == OBD_OBJECT_EOF ||
1671                             starting_offset > oap->oap_obj_off)
1672                                 starting_offset = oap->oap_obj_off;
1673                         else
1674                                 LASSERT(oap->oap_page_off == 0);
1675                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1676                                 ending_offset = oap->oap_obj_off +
1677                                                 oap->oap_count;
1678                         else
1679                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1680                                         PAGE_CACHE_SIZE);
1681                 }
1682         }
1683
1684         soft_sync = osc_over_unstable_soft_limit(cli);
1685         if (mem_tight)
1686                 mpflag = cfs_memory_pressure_get_and_set();
1687
1688         OBD_ALLOC(crattr, sizeof(*crattr));
1689         if (crattr == NULL)
1690                 GOTO(out, rc = -ENOMEM);
1691
1692         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1693         if (pga == NULL)
1694                 GOTO(out, rc = -ENOMEM);
1695
1696         OBDO_ALLOC(oa);
1697         if (oa == NULL)
1698                 GOTO(out, rc = -ENOMEM);
1699
1700         i = 0;
1701         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1702                 struct cl_page *page = oap2cl_page(oap);
1703                 if (clerq == NULL) {
1704                         clerq = cl_req_alloc(env, page, crt,
1705                                              1 /* only 1-object rpcs for now */);
1706                         if (IS_ERR(clerq))
1707                                 GOTO(out, rc = PTR_ERR(clerq));
1708                 }
1709                 if (mem_tight)
1710                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1711                 if (soft_sync)
1712                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1713                 pga[i] = &oap->oap_brw_page;
1714                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1715                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1716                        pga[i]->pg, page_index(oap->oap_page), oap,
1717                        pga[i]->flag);
1718                 i++;
1719                 cl_req_page_add(env, clerq, page);
1720         }
1721
1722         /* always get the data for the obdo for the rpc */
1723         LASSERT(clerq != NULL);
1724         crattr->cra_oa = oa;
1725         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1726
1727         rc = cl_req_prep(env, clerq);
1728         if (rc != 0) {
1729                 CERROR("cl_req_prep failed: %d\n", rc);
1730                 GOTO(out, rc);
1731         }
1732
1733         sort_brw_pages(pga, page_count);
1734         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1735         if (rc != 0) {
1736                 CERROR("prep_req failed: %d\n", rc);
1737                 GOTO(out, rc);
1738         }
1739
1740         req->rq_commit_cb = brw_commit;
1741         req->rq_interpret_reply = brw_interpret;
1742
1743         if (mem_tight != 0)
1744                 req->rq_memalloc = 1;
1745
1746         /* Need to update the timestamps after the request is built in case
1747          * we race with setattr (locally or in queue at OST).  If OST gets
1748          * later setattr before earlier BRW (as determined by the request xid),
1749          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1750          * way to do this in a single call.  bug 10150 */
1751         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1752         crattr->cra_oa = &body->oa;
1753         cl_req_attr_set(env, clerq, crattr,
1754                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1755
1756         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1757
1758         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1759         aa = ptlrpc_req_async_args(req);
1760         INIT_LIST_HEAD(&aa->aa_oaps);
1761         list_splice_init(&rpc_list, &aa->aa_oaps);
1762         INIT_LIST_HEAD(&aa->aa_exts);
1763         list_splice_init(ext_list, &aa->aa_exts);
1764         aa->aa_clerq = clerq;
1765
1766         /* queued sync pages can be torn down while the pages
1767          * were between the pending list and the rpc */
1768         tmp = NULL;
1769         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1770                 /* only one oap gets a request reference */
1771                 if (tmp == NULL)
1772                         tmp = oap;
1773                 if (oap->oap_interrupted && !req->rq_intr) {
1774                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1775                                         oap, req);
1776                         ptlrpc_mark_interrupted(req);
1777                 }
1778         }
1779         if (tmp != NULL)
1780                 tmp->oap_request = ptlrpc_request_addref(req);
1781
1782         spin_lock(&cli->cl_loi_list_lock);
1783         starting_offset >>= PAGE_CACHE_SHIFT;
1784         if (cmd == OBD_BRW_READ) {
1785                 cli->cl_r_in_flight++;
1786                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1787                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1788                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1789                                       starting_offset + 1);
1790         } else {
1791                 cli->cl_w_in_flight++;
1792                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1793                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1794                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1795                                       starting_offset + 1);
1796         }
1797         spin_unlock(&cli->cl_loi_list_lock);
1798
1799         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1800                   page_count, aa, cli->cl_r_in_flight,
1801                   cli->cl_w_in_flight);
1802
1803         ptlrpcd_add_req(req);
1804         rc = 0;
1805         EXIT;
1806
1807 out:
1808         if (mem_tight != 0)
1809                 cfs_memory_pressure_restore(mpflag);
1810
1811         if (crattr != NULL)
1812                 OBD_FREE(crattr, sizeof(*crattr));
1813
1814         if (rc != 0) {
1815                 LASSERT(req == NULL);
1816
1817                 if (oa)
1818                         OBDO_FREE(oa);
1819                 if (pga)
1820                         OBD_FREE(pga, sizeof(*pga) * page_count);
1821                 /* this should happen rarely and is pretty bad, it makes the
1822                  * pending list not follow the dirty order */
1823                 while (!list_empty(ext_list)) {
1824                         ext = list_entry(ext_list->next, struct osc_extent,
1825                                          oe_link);
1826                         list_del_init(&ext->oe_link);
1827                         osc_extent_finish(env, ext, 0, rc);
1828                 }
1829                 if (clerq && !IS_ERR(clerq))
1830                         cl_req_completion(env, clerq, rc);
1831         }
1832         RETURN(rc);
1833 }
1834
1835 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1836                                         struct ldlm_enqueue_info *einfo)
1837 {
1838         void *data = einfo->ei_cbdata;
1839         int set = 0;
1840
1841         LASSERT(lock != NULL);
1842         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1843         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1844         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1845         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1846
1847         lock_res_and_lock(lock);
1848
1849         if (lock->l_ast_data == NULL)
1850                 lock->l_ast_data = data;
1851         if (lock->l_ast_data == data)
1852                 set = 1;
1853
1854         unlock_res_and_lock(lock);
1855
1856         return set;
1857 }
1858
1859 static int osc_set_data_with_check(struct lustre_handle *lockh,
1860                                    struct ldlm_enqueue_info *einfo)
1861 {
1862         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1863         int set = 0;
1864
1865         if (lock != NULL) {
1866                 set = osc_set_lock_data_with_check(lock, einfo);
1867                 LDLM_LOCK_PUT(lock);
1868         } else
1869                 CERROR("lockh %p, data %p - client evicted?\n",
1870                        lockh, einfo->ei_cbdata);
1871         return set;
1872 }
1873
1874 static int osc_enqueue_fini(struct ptlrpc_request *req,
1875                             osc_enqueue_upcall_f upcall, void *cookie,
1876                             struct lustre_handle *lockh, enum ldlm_mode mode,
1877                             __u64 *flags, int agl, int errcode)
1878 {
1879         bool intent = *flags & LDLM_FL_HAS_INTENT;
1880         int rc;
1881         ENTRY;
1882
1883         /* The request was created before ldlm_cli_enqueue call. */
1884         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1885                 struct ldlm_reply *rep;
1886
1887                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1888                 LASSERT(rep != NULL);
1889
1890                 rep->lock_policy_res1 =
1891                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1892                 if (rep->lock_policy_res1)
1893                         errcode = rep->lock_policy_res1;
1894                 if (!agl)
1895                         *flags |= LDLM_FL_LVB_READY;
1896         } else if (errcode == ELDLM_OK) {
1897                 *flags |= LDLM_FL_LVB_READY;
1898         }
1899
1900         /* Call the update callback. */
1901         rc = (*upcall)(cookie, lockh, errcode);
1902
1903         /* release the reference taken in ldlm_cli_enqueue() */
1904         if (errcode == ELDLM_LOCK_MATCHED)
1905                 errcode = ELDLM_OK;
1906         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1907                 ldlm_lock_decref(lockh, mode);
1908
1909         RETURN(rc);
1910 }
1911
1912 static int osc_enqueue_interpret(const struct lu_env *env,
1913                                  struct ptlrpc_request *req,
1914                                  struct osc_enqueue_args *aa, int rc)
1915 {
1916         struct ldlm_lock *lock;
1917         struct lustre_handle *lockh = &aa->oa_lockh;
1918         enum ldlm_mode mode = aa->oa_mode;
1919         struct ost_lvb *lvb = aa->oa_lvb;
1920         __u32 lvb_len = sizeof(*lvb);
1921         __u64 flags = 0;
1922
1923         ENTRY;
1924
1925         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1926          * be valid. */
1927         lock = ldlm_handle2lock(lockh);
1928         LASSERTF(lock != NULL,
1929                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1930                  lockh->cookie, req, aa);
1931
1932         /* Take an additional reference so that a blocking AST that
1933          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1934          * to arrive after an upcall has been executed by
1935          * osc_enqueue_fini(). */
1936         ldlm_lock_addref(lockh, mode);
1937
1938         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1939         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1940
1941         /* Let CP AST to grant the lock first. */
1942         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1943
1944         if (aa->oa_agl) {
1945                 LASSERT(aa->oa_lvb == NULL);
1946                 LASSERT(aa->oa_flags == NULL);
1947                 aa->oa_flags = &flags;
1948         }
1949
1950         /* Complete obtaining the lock procedure. */
1951         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1952                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1953                                    lockh, rc);
1954         /* Complete osc stuff. */
1955         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1956                               aa->oa_flags, aa->oa_agl, rc);
1957
1958         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1959
1960         ldlm_lock_decref(lockh, mode);
1961         LDLM_LOCK_PUT(lock);
1962         RETURN(rc);
1963 }
1964
1965 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1966
1967 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1968  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1969  * other synchronous requests, however keeping some locks and trying to obtain
1970  * others may take a considerable amount of time in a case of ost failure; and
1971  * when other sync requests do not get released lock from a client, the client
1972  * is evicted from the cluster -- such scenarious make the life difficult, so
1973  * release locks just after they are obtained. */
1974 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1975                      __u64 *flags, union ldlm_policy_data *policy,
1976                      struct ost_lvb *lvb, int kms_valid,
1977                      osc_enqueue_upcall_f upcall, void *cookie,
1978                      struct ldlm_enqueue_info *einfo,
1979                      struct ptlrpc_request_set *rqset, int async, int agl)
1980 {
1981         struct obd_device *obd = exp->exp_obd;
1982         struct lustre_handle lockh = { 0 };
1983         struct ptlrpc_request *req = NULL;
1984         int intent = *flags & LDLM_FL_HAS_INTENT;
1985         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1986         enum ldlm_mode mode;
1987         int rc;
1988         ENTRY;
1989
1990         /* Filesystem lock extents are extended to page boundaries so that
1991          * dealing with the page cache is a little smoother.  */
1992         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1993         policy->l_extent.end |= ~PAGE_MASK;
1994
1995         /*
1996          * kms is not valid when either object is completely fresh (so that no
1997          * locks are cached), or object was evicted. In the latter case cached
1998          * lock cannot be used, because it would prime inode state with
1999          * potentially stale LVB.
2000          */
2001         if (!kms_valid)
2002                 goto no_match;
2003
2004         /* Next, search for already existing extent locks that will cover us */
2005         /* If we're trying to read, we also search for an existing PW lock.  The
2006          * VFS and page cache already protect us locally, so lots of readers/
2007          * writers can share a single PW lock.
2008          *
2009          * There are problems with conversion deadlocks, so instead of
2010          * converting a read lock to a write lock, we'll just enqueue a new
2011          * one.
2012          *
2013          * At some point we should cancel the read lock instead of making them
2014          * send us a blocking callback, but there are problems with canceling
2015          * locks out from other users right now, too. */
2016         mode = einfo->ei_mode;
2017         if (einfo->ei_mode == LCK_PR)
2018                 mode |= LCK_PW;
2019         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2020                                einfo->ei_type, policy, mode, &lockh, 0);
2021         if (mode) {
2022                 struct ldlm_lock *matched;
2023
2024                 if (*flags & LDLM_FL_TEST_LOCK)
2025                         RETURN(ELDLM_OK);
2026
2027                 matched = ldlm_handle2lock(&lockh);
2028                 if (agl) {
2029                         /* AGL enqueues DLM locks speculatively. Therefore if
2030                          * it already exists a DLM lock, it wll just inform the
2031                          * caller to cancel the AGL process for this stripe. */
2032                         ldlm_lock_decref(&lockh, mode);
2033                         LDLM_LOCK_PUT(matched);
2034                         RETURN(-ECANCELED);
2035                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2036                         *flags |= LDLM_FL_LVB_READY;
2037
2038                         /* We already have a lock, and it's referenced. */
2039                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2040
2041                         ldlm_lock_decref(&lockh, mode);
2042                         LDLM_LOCK_PUT(matched);
2043                         RETURN(ELDLM_OK);
2044                 } else {
2045                         ldlm_lock_decref(&lockh, mode);
2046                         LDLM_LOCK_PUT(matched);
2047                 }
2048         }
2049
2050 no_match:
2051         if (*flags & LDLM_FL_TEST_LOCK)
2052                 RETURN(-ENOLCK);
2053
2054         if (intent) {
2055                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2056                                            &RQF_LDLM_ENQUEUE_LVB);
2057                 if (req == NULL)
2058                         RETURN(-ENOMEM);
2059
2060                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2061                 if (rc) {
2062                         ptlrpc_request_free(req);
2063                         RETURN(rc);
2064                 }
2065
2066                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2067                                      sizeof *lvb);
2068                 ptlrpc_request_set_replen(req);
2069         }
2070
2071         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2072         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2073
2074         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2075                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2076         if (async) {
2077                 if (!rc) {
2078                         struct osc_enqueue_args *aa;
2079                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2080                         aa = ptlrpc_req_async_args(req);
2081                         aa->oa_exp    = exp;
2082                         aa->oa_mode   = einfo->ei_mode;
2083                         aa->oa_type   = einfo->ei_type;
2084                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2085                         aa->oa_upcall = upcall;
2086                         aa->oa_cookie = cookie;
2087                         aa->oa_agl    = !!agl;
2088                         if (!agl) {
2089                                 aa->oa_flags  = flags;
2090                                 aa->oa_lvb    = lvb;
2091                         } else {
2092                                 /* AGL is essentially to enqueue an DLM lock
2093                                  * in advance, so we don't care about the
2094                                  * result of AGL enqueue. */
2095                                 aa->oa_lvb    = NULL;
2096                                 aa->oa_flags  = NULL;
2097                         }
2098
2099                         req->rq_interpret_reply =
2100                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2101                         if (rqset == PTLRPCD_SET)
2102                                 ptlrpcd_add_req(req);
2103                         else
2104                                 ptlrpc_set_add_req(rqset, req);
2105                 } else if (intent) {
2106                         ptlrpc_req_finished(req);
2107                 }
2108                 RETURN(rc);
2109         }
2110
2111         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2112                               flags, agl, rc);
2113         if (intent)
2114                 ptlrpc_req_finished(req);
2115
2116         RETURN(rc);
2117 }
2118
2119 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2120                    enum ldlm_type type, union ldlm_policy_data *policy,
2121                    enum ldlm_mode mode, __u64 *flags, void *data,
2122                    struct lustre_handle *lockh, int unref)
2123 {
2124         struct obd_device *obd = exp->exp_obd;
2125         __u64 lflags = *flags;
2126         enum ldlm_mode rc;
2127         ENTRY;
2128
2129         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2130                 RETURN(-EIO);
2131
2132         /* Filesystem lock extents are extended to page boundaries so that
2133          * dealing with the page cache is a little smoother */
2134         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2135         policy->l_extent.end |= ~PAGE_MASK;
2136
2137         /* Next, search for already existing extent locks that will cover us */
2138         /* If we're trying to read, we also search for an existing PW lock.  The
2139          * VFS and page cache already protect us locally, so lots of readers/
2140          * writers can share a single PW lock. */
2141         rc = mode;
2142         if (mode == LCK_PR)
2143                 rc |= LCK_PW;
2144         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2145                              res_id, type, policy, rc, lockh, unref);
2146         if (rc) {
2147                 if (data != NULL) {
2148                         if (!osc_set_data_with_check(lockh, data)) {
2149                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2150                                         ldlm_lock_decref(lockh, rc);
2151                                 RETURN(0);
2152                         }
2153                 }
2154                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2155                         ldlm_lock_addref(lockh, LCK_PR);
2156                         ldlm_lock_decref(lockh, LCK_PW);
2157                 }
2158                 RETURN(rc);
2159         }
2160         RETURN(rc);
2161 }
2162
2163 static int osc_statfs_interpret(const struct lu_env *env,
2164                                 struct ptlrpc_request *req,
2165                                 struct osc_async_args *aa, int rc)
2166 {
2167         struct obd_statfs *msfs;
2168         ENTRY;
2169
2170         if (rc == -EBADR)
2171                 /* The request has in fact never been sent
2172                  * due to issues at a higher level (LOV).
2173                  * Exit immediately since the caller is
2174                  * aware of the problem and takes care
2175                  * of the clean up */
2176                  RETURN(rc);
2177
2178         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2179             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2180                 GOTO(out, rc = 0);
2181
2182         if (rc != 0)
2183                 GOTO(out, rc);
2184
2185         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2186         if (msfs == NULL) {
2187                 GOTO(out, rc = -EPROTO);
2188         }
2189
2190         *aa->aa_oi->oi_osfs = *msfs;
2191 out:
2192         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2193         RETURN(rc);
2194 }
2195
2196 static int osc_statfs_async(struct obd_export *exp,
2197                             struct obd_info *oinfo, __u64 max_age,
2198                             struct ptlrpc_request_set *rqset)
2199 {
2200         struct obd_device     *obd = class_exp2obd(exp);
2201         struct ptlrpc_request *req;
2202         struct osc_async_args *aa;
2203         int                    rc;
2204         ENTRY;
2205
2206         /* We could possibly pass max_age in the request (as an absolute
2207          * timestamp or a "seconds.usec ago") so the target can avoid doing
2208          * extra calls into the filesystem if that isn't necessary (e.g.
2209          * during mount that would help a bit).  Having relative timestamps
2210          * is not so great if request processing is slow, while absolute
2211          * timestamps are not ideal because they need time synchronization. */
2212         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2213         if (req == NULL)
2214                 RETURN(-ENOMEM);
2215
2216         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2217         if (rc) {
2218                 ptlrpc_request_free(req);
2219                 RETURN(rc);
2220         }
2221         ptlrpc_request_set_replen(req);
2222         req->rq_request_portal = OST_CREATE_PORTAL;
2223         ptlrpc_at_set_req_timeout(req);
2224
2225         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2226                 /* procfs requests not want stat in wait for avoid deadlock */
2227                 req->rq_no_resend = 1;
2228                 req->rq_no_delay = 1;
2229         }
2230
2231         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2232         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2233         aa = ptlrpc_req_async_args(req);
2234         aa->aa_oi = oinfo;
2235
2236         ptlrpc_set_add_req(rqset, req);
2237         RETURN(0);
2238 }
2239
2240 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2241                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2242 {
2243         struct obd_device     *obd = class_exp2obd(exp);
2244         struct obd_statfs     *msfs;
2245         struct ptlrpc_request *req;
2246         struct obd_import     *imp = NULL;
2247         int rc;
2248         ENTRY;
2249
2250         /*Since the request might also come from lprocfs, so we need
2251          *sync this with client_disconnect_export Bug15684*/
2252         down_read(&obd->u.cli.cl_sem);
2253         if (obd->u.cli.cl_import)
2254                 imp = class_import_get(obd->u.cli.cl_import);
2255         up_read(&obd->u.cli.cl_sem);
2256         if (!imp)
2257                 RETURN(-ENODEV);
2258
2259         /* We could possibly pass max_age in the request (as an absolute
2260          * timestamp or a "seconds.usec ago") so the target can avoid doing
2261          * extra calls into the filesystem if that isn't necessary (e.g.
2262          * during mount that would help a bit).  Having relative timestamps
2263          * is not so great if request processing is slow, while absolute
2264          * timestamps are not ideal because they need time synchronization. */
2265         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2266
2267         class_import_put(imp);
2268
2269         if (req == NULL)
2270                 RETURN(-ENOMEM);
2271
2272         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2273         if (rc) {
2274                 ptlrpc_request_free(req);
2275                 RETURN(rc);
2276         }
2277         ptlrpc_request_set_replen(req);
2278         req->rq_request_portal = OST_CREATE_PORTAL;
2279         ptlrpc_at_set_req_timeout(req);
2280
2281         if (flags & OBD_STATFS_NODELAY) {
2282                 /* procfs requests not want stat in wait for avoid deadlock */
2283                 req->rq_no_resend = 1;
2284                 req->rq_no_delay = 1;
2285         }
2286
2287         rc = ptlrpc_queue_wait(req);
2288         if (rc)
2289                 GOTO(out, rc);
2290
2291         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2292         if (msfs == NULL) {
2293                 GOTO(out, rc = -EPROTO);
2294         }
2295
2296         *osfs = *msfs;
2297
2298         EXIT;
2299  out:
2300         ptlrpc_req_finished(req);
2301         return rc;
2302 }
2303
2304 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2305                          void *karg, void *uarg)
2306 {
2307         struct obd_device *obd = exp->exp_obd;
2308         struct obd_ioctl_data *data = karg;
2309         int err = 0;
2310         ENTRY;
2311
2312         if (!try_module_get(THIS_MODULE)) {
2313                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2314                        module_name(THIS_MODULE));
2315                 return -EINVAL;
2316         }
2317         switch (cmd) {
2318         case OBD_IOC_CLIENT_RECOVER:
2319                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2320                                             data->ioc_inlbuf1, 0);
2321                 if (err > 0)
2322                         err = 0;
2323                 GOTO(out, err);
2324         case IOC_OSC_SET_ACTIVE:
2325                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2326                                                data->ioc_offset);
2327                 GOTO(out, err);
2328         case OBD_IOC_PING_TARGET:
2329                 err = ptlrpc_obd_ping(obd);
2330                 GOTO(out, err);
2331         default:
2332                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2333                        cmd, current_comm());
2334                 GOTO(out, err = -ENOTTY);
2335         }
2336 out:
2337         module_put(THIS_MODULE);
2338         return err;
2339 }
2340
2341 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2342                               u32 keylen, void *key,
2343                               u32 vallen, void *val,
2344                               struct ptlrpc_request_set *set)
2345 {
2346         struct ptlrpc_request *req;
2347         struct obd_device     *obd = exp->exp_obd;
2348         struct obd_import     *imp = class_exp2cliimp(exp);
2349         char                  *tmp;
2350         int                    rc;
2351         ENTRY;
2352
2353         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2354
2355         if (KEY_IS(KEY_CHECKSUM)) {
2356                 if (vallen != sizeof(int))
2357                         RETURN(-EINVAL);
2358                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2359                 RETURN(0);
2360         }
2361
2362         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2363                 sptlrpc_conf_client_adapt(obd);
2364                 RETURN(0);
2365         }
2366
2367         if (KEY_IS(KEY_FLUSH_CTX)) {
2368                 sptlrpc_import_flush_my_ctx(imp);
2369                 RETURN(0);
2370         }
2371
2372         if (KEY_IS(KEY_CACHE_SET)) {
2373                 struct client_obd *cli = &obd->u.cli;
2374
2375                 LASSERT(cli->cl_cache == NULL); /* only once */
2376                 cli->cl_cache = (struct cl_client_cache *)val;
2377                 cl_cache_incref(cli->cl_cache);
2378                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2379
2380                 /* add this osc into entity list */
2381                 LASSERT(list_empty(&cli->cl_lru_osc));
2382                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2383                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2384                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2385
2386                 RETURN(0);
2387         }
2388
2389         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2390                 struct client_obd *cli = &obd->u.cli;
2391                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2392                 long target = *(long *)val;
2393
2394                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2395                 *(long *)val -= nr;
2396                 RETURN(0);
2397         }
2398
2399         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2400                 RETURN(-EINVAL);
2401
2402         /* We pass all other commands directly to OST. Since nobody calls osc
2403            methods directly and everybody is supposed to go through LOV, we
2404            assume lov checked invalid values for us.
2405            The only recognised values so far are evict_by_nid and mds_conn.
2406            Even if something bad goes through, we'd get a -EINVAL from OST
2407            anyway. */
2408
2409         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2410                                                 &RQF_OST_SET_GRANT_INFO :
2411                                                 &RQF_OBD_SET_INFO);
2412         if (req == NULL)
2413                 RETURN(-ENOMEM);
2414
2415         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2416                              RCL_CLIENT, keylen);
2417         if (!KEY_IS(KEY_GRANT_SHRINK))
2418                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2419                                      RCL_CLIENT, vallen);
2420         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2421         if (rc) {
2422                 ptlrpc_request_free(req);
2423                 RETURN(rc);
2424         }
2425
2426         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2427         memcpy(tmp, key, keylen);
2428         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2429                                                         &RMF_OST_BODY :
2430                                                         &RMF_SETINFO_VAL);
2431         memcpy(tmp, val, vallen);
2432
2433         if (KEY_IS(KEY_GRANT_SHRINK)) {
2434                 struct osc_grant_args *aa;
2435                 struct obdo *oa;
2436
2437                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2438                 aa = ptlrpc_req_async_args(req);
2439                 OBDO_ALLOC(oa);
2440                 if (!oa) {
2441                         ptlrpc_req_finished(req);
2442                         RETURN(-ENOMEM);
2443                 }
2444                 *oa = ((struct ost_body *)val)->oa;
2445                 aa->aa_oa = oa;
2446                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2447         }
2448
2449         ptlrpc_request_set_replen(req);
2450         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2451                 LASSERT(set != NULL);
2452                 ptlrpc_set_add_req(set, req);
2453                 ptlrpc_check_set(NULL, set);
2454         } else {
2455                 ptlrpcd_add_req(req);
2456         }
2457
2458         RETURN(0);
2459 }
2460
2461 static int osc_reconnect(const struct lu_env *env,
2462                          struct obd_export *exp, struct obd_device *obd,
2463                          struct obd_uuid *cluuid,
2464                          struct obd_connect_data *data,
2465                          void *localdata)
2466 {
2467         struct client_obd *cli = &obd->u.cli;
2468
2469         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2470                 long lost_grant;
2471
2472                 spin_lock(&cli->cl_loi_list_lock);
2473                 data->ocd_grant = (cli->cl_avail_grant +
2474                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2475                                   2 * cli_brw_size(obd);
2476                 lost_grant = cli->cl_lost_grant;
2477                 cli->cl_lost_grant = 0;
2478                 spin_unlock(&cli->cl_loi_list_lock);
2479
2480                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2481                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2482                        data->ocd_version, data->ocd_grant, lost_grant);
2483         }
2484
2485         RETURN(0);
2486 }
2487
2488 static int osc_disconnect(struct obd_export *exp)
2489 {
2490         struct obd_device *obd = class_exp2obd(exp);
2491         int rc;
2492
2493         rc = client_disconnect_export(exp);
2494         /**
2495          * Initially we put del_shrink_grant before disconnect_export, but it
2496          * causes the following problem if setup (connect) and cleanup
2497          * (disconnect) are tangled together.
2498          *      connect p1                     disconnect p2
2499          *   ptlrpc_connect_import
2500          *     ...............               class_manual_cleanup
2501          *                                     osc_disconnect
2502          *                                     del_shrink_grant
2503          *   ptlrpc_connect_interrupt
2504          *     init_grant_shrink
2505          *   add this client to shrink list
2506          *                                      cleanup_osc
2507          * Bang! pinger trigger the shrink.
2508          * So the osc should be disconnected from the shrink list, after we
2509          * are sure the import has been destroyed. BUG18662
2510          */
2511         if (obd->u.cli.cl_import == NULL)
2512                 osc_del_shrink_grant(&obd->u.cli);
2513         return rc;
2514 }
2515
2516 static int osc_import_event(struct obd_device *obd,
2517                             struct obd_import *imp,
2518                             enum obd_import_event event)
2519 {
2520         struct client_obd *cli;
2521         int rc = 0;
2522
2523         ENTRY;
2524         LASSERT(imp->imp_obd == obd);
2525
2526         switch (event) {
2527         case IMP_EVENT_DISCON: {
2528                 cli = &obd->u.cli;
2529                 spin_lock(&cli->cl_loi_list_lock);
2530                 cli->cl_avail_grant = 0;
2531                 cli->cl_lost_grant = 0;
2532                 spin_unlock(&cli->cl_loi_list_lock);
2533                 break;
2534         }
2535         case IMP_EVENT_INACTIVE: {
2536                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2537                 break;
2538         }
2539         case IMP_EVENT_INVALIDATE: {
2540                 struct ldlm_namespace *ns = obd->obd_namespace;
2541                 struct lu_env         *env;
2542                 int                    refcheck;
2543
2544                 env = cl_env_get(&refcheck);
2545                 if (!IS_ERR(env)) {
2546                         /* Reset grants */
2547                         cli = &obd->u.cli;
2548                         /* all pages go to failing rpcs due to the invalid
2549                          * import */
2550                         osc_io_unplug(env, cli, NULL);
2551
2552                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2553                         cl_env_put(env, &refcheck);
2554                 } else
2555                         rc = PTR_ERR(env);
2556                 break;
2557         }
2558         case IMP_EVENT_ACTIVE: {
2559                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2560                 break;
2561         }
2562         case IMP_EVENT_OCD: {
2563                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2564
2565                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2566                         osc_init_grant(&obd->u.cli, ocd);
2567
2568                 /* See bug 7198 */
2569                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2570                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2571
2572                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2573                 break;
2574         }
2575         case IMP_EVENT_DEACTIVATE: {
2576                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2577                 break;
2578         }
2579         case IMP_EVENT_ACTIVATE: {
2580                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2581                 break;
2582         }
2583         default:
2584                 CERROR("Unknown import event %d\n", event);
2585                 LBUG();
2586         }
2587         RETURN(rc);
2588 }
2589
2590 /**
2591  * Determine whether the lock can be canceled before replaying the lock
2592  * during recovery, see bug16774 for detailed information.
2593  *
2594  * \retval zero the lock can't be canceled
2595  * \retval other ok to cancel
2596  */
2597 static int osc_cancel_weight(struct ldlm_lock *lock)
2598 {
2599         /*
2600          * Cancel all unused and granted extent lock.
2601          */
2602         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2603             lock->l_granted_mode == lock->l_req_mode &&
2604             osc_ldlm_weigh_ast(lock) == 0)
2605                 RETURN(1);
2606
2607         RETURN(0);
2608 }
2609
2610 static int brw_queue_work(const struct lu_env *env, void *data)
2611 {
2612         struct client_obd *cli = data;
2613
2614         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2615
2616         osc_io_unplug(env, cli, NULL);
2617         RETURN(0);
2618 }
2619
2620 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2621 {
2622         struct client_obd *cli = &obd->u.cli;
2623         struct obd_type   *type;
2624         void              *handler;
2625         int                rc;
2626         int                adding;
2627         int                added;
2628         int                req_count;
2629         ENTRY;
2630
2631         rc = ptlrpcd_addref();
2632         if (rc)
2633                 RETURN(rc);
2634
2635         rc = client_obd_setup(obd, lcfg);
2636         if (rc)
2637                 GOTO(out_ptlrpcd, rc);
2638
2639         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2640         if (IS_ERR(handler))
2641                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2642         cli->cl_writeback_work = handler;
2643
2644         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2645         if (IS_ERR(handler))
2646                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2647         cli->cl_lru_work = handler;
2648
2649         rc = osc_quota_setup(obd);
2650         if (rc)
2651                 GOTO(out_ptlrpcd_work, rc);
2652
2653         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2654
2655 #ifdef CONFIG_PROC_FS
2656         obd->obd_vars = lprocfs_osc_obd_vars;
2657 #endif
2658         /* If this is true then both client (osc) and server (osp) are on the
2659          * same node. The osp layer if loaded first will register the osc proc
2660          * directory. In that case this obd_device will be attached its proc
2661          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2662         type = class_search_type(LUSTRE_OSP_NAME);
2663         if (type && type->typ_procsym) {
2664                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2665                                                        type->typ_procsym,
2666                                                        obd->obd_vars, obd);
2667                 if (IS_ERR(obd->obd_proc_entry)) {
2668                         rc = PTR_ERR(obd->obd_proc_entry);
2669                         CERROR("error %d setting up lprocfs for %s\n", rc,
2670                                obd->obd_name);
2671                         obd->obd_proc_entry = NULL;
2672                 }
2673         } else {
2674                 rc = lprocfs_obd_setup(obd);
2675         }
2676
2677         /* If the basic OSC proc tree construction succeeded then
2678          * lets do the rest. */
2679         if (rc == 0) {
2680                 lproc_osc_attach_seqstat(obd);
2681                 sptlrpc_lprocfs_cliobd_attach(obd);
2682                 ptlrpc_lprocfs_register_obd(obd);
2683         }
2684
2685         /*
2686          * We try to control the total number of requests with a upper limit
2687          * osc_reqpool_maxreqcount. There might be some race which will cause
2688          * over-limit allocation, but it is fine.
2689          */
2690         req_count = atomic_read(&osc_pool_req_count);
2691         if (req_count < osc_reqpool_maxreqcount) {
2692                 adding = cli->cl_max_rpcs_in_flight + 2;
2693                 if (req_count + adding > osc_reqpool_maxreqcount)
2694                         adding = osc_reqpool_maxreqcount - req_count;
2695
2696                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2697                 atomic_add(added, &osc_pool_req_count);
2698         }
2699
2700         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2701         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2702         RETURN(0);
2703
2704 out_ptlrpcd_work:
2705         if (cli->cl_writeback_work != NULL) {
2706                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2707                 cli->cl_writeback_work = NULL;
2708         }
2709         if (cli->cl_lru_work != NULL) {
2710                 ptlrpcd_destroy_work(cli->cl_lru_work);
2711                 cli->cl_lru_work = NULL;
2712         }
2713 out_client_setup:
2714         client_obd_cleanup(obd);
2715 out_ptlrpcd:
2716         ptlrpcd_decref();
2717         RETURN(rc);
2718 }
2719
2720 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2721 {
2722         int rc = 0;
2723         ENTRY;
2724
2725         switch (stage) {
2726         case OBD_CLEANUP_EARLY: {
2727                 struct obd_import *imp;
2728                 imp = obd->u.cli.cl_import;
2729                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2730                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2731                 ptlrpc_deactivate_import(imp);
2732                 spin_lock(&imp->imp_lock);
2733                 imp->imp_pingable = 0;
2734                 spin_unlock(&imp->imp_lock);
2735                 break;
2736         }
2737         case OBD_CLEANUP_EXPORTS: {
2738                 struct client_obd *cli = &obd->u.cli;
2739                 /* LU-464
2740                  * for echo client, export may be on zombie list, wait for
2741                  * zombie thread to cull it, because cli.cl_import will be
2742                  * cleared in client_disconnect_export():
2743                  *   class_export_destroy() -> obd_cleanup() ->
2744                  *   echo_device_free() -> echo_client_cleanup() ->
2745                  *   obd_disconnect() -> osc_disconnect() ->
2746                  *   client_disconnect_export()
2747                  */
2748                 obd_zombie_barrier();
2749                 if (cli->cl_writeback_work) {
2750                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2751                         cli->cl_writeback_work = NULL;
2752                 }
2753                 if (cli->cl_lru_work) {
2754                         ptlrpcd_destroy_work(cli->cl_lru_work);
2755                         cli->cl_lru_work = NULL;
2756                 }
2757                 obd_cleanup_client_import(obd);
2758                 ptlrpc_lprocfs_unregister_obd(obd);
2759                 lprocfs_obd_cleanup(obd);
2760                 break;
2761                 }
2762         }
2763         RETURN(rc);
2764 }
2765
2766 int osc_cleanup(struct obd_device *obd)
2767 {
2768         struct client_obd *cli = &obd->u.cli;
2769         int rc;
2770
2771         ENTRY;
2772
2773         /* lru cleanup */
2774         if (cli->cl_cache != NULL) {
2775                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2776                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2777                 list_del_init(&cli->cl_lru_osc);
2778                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2779                 cli->cl_lru_left = NULL;
2780                 cl_cache_decref(cli->cl_cache);
2781                 cli->cl_cache = NULL;
2782         }
2783
2784         /* free memory of osc quota cache */
2785         osc_quota_cleanup(obd);
2786
2787         rc = client_obd_cleanup(obd);
2788
2789         ptlrpcd_decref();
2790         RETURN(rc);
2791 }
2792
2793 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2794 {
2795         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2796         return rc > 0 ? 0: rc;
2797 }
2798
2799 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2800 {
2801         return osc_process_config_base(obd, buf);
2802 }
2803
2804 static struct obd_ops osc_obd_ops = {
2805         .o_owner                = THIS_MODULE,
2806         .o_setup                = osc_setup,
2807         .o_precleanup           = osc_precleanup,
2808         .o_cleanup              = osc_cleanup,
2809         .o_add_conn             = client_import_add_conn,
2810         .o_del_conn             = client_import_del_conn,
2811         .o_connect              = client_connect_import,
2812         .o_reconnect            = osc_reconnect,
2813         .o_disconnect           = osc_disconnect,
2814         .o_statfs               = osc_statfs,
2815         .o_statfs_async         = osc_statfs_async,
2816         .o_create               = osc_create,
2817         .o_destroy              = osc_destroy,
2818         .o_getattr              = osc_getattr,
2819         .o_setattr              = osc_setattr,
2820         .o_iocontrol            = osc_iocontrol,
2821         .o_set_info_async       = osc_set_info_async,
2822         .o_import_event         = osc_import_event,
2823         .o_process_config       = osc_process_config,
2824         .o_quotactl             = osc_quotactl,
2825 };
2826
2827 static int __init osc_init(void)
2828 {
2829         bool enable_proc = true;
2830         struct obd_type *type;
2831         unsigned int reqpool_size;
2832         unsigned int reqsize;
2833         int rc;
2834
2835         ENTRY;
2836
2837         /* print an address of _any_ initialized kernel symbol from this
2838          * module, to allow debugging with gdb that doesn't support data
2839          * symbols from modules.*/
2840         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2841
2842         rc = lu_kmem_init(osc_caches);
2843         if (rc)
2844                 RETURN(rc);
2845
2846         type = class_search_type(LUSTRE_OSP_NAME);
2847         if (type != NULL && type->typ_procsym != NULL)
2848                 enable_proc = false;
2849
2850         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2851                                  LUSTRE_OSC_NAME, &osc_device_type);
2852         if (rc)
2853                 GOTO(out_kmem, rc);
2854
2855         /* This is obviously too much memory, only prevent overflow here */
2856         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2857                 GOTO(out_type, rc = -EINVAL);
2858
2859         reqpool_size = osc_reqpool_mem_max << 20;
2860
2861         reqsize = 1;
2862         while (reqsize < OST_IO_MAXREQSIZE)
2863                 reqsize = reqsize << 1;
2864
2865         /*
2866          * We don't enlarge the request count in OSC pool according to
2867          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2868          * tried after normal allocation failed. So a small OSC pool won't
2869          * cause much performance degression in most of cases.
2870          */
2871         osc_reqpool_maxreqcount = reqpool_size / reqsize;
2872
2873         atomic_set(&osc_pool_req_count, 0);
2874         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2875                                           ptlrpc_add_rqs_to_pool);
2876
2877         if (osc_rq_pool != NULL)
2878                 GOTO(out, rc);
2879         rc = -ENOMEM;
2880 out_type:
2881         class_unregister_type(LUSTRE_OSC_NAME);
2882 out_kmem:
2883         lu_kmem_fini(osc_caches);
2884 out:
2885         RETURN(rc);
2886 }
2887
2888 static void /*__exit*/ osc_exit(void)
2889 {
2890         class_unregister_type(LUSTRE_OSC_NAME);
2891         lu_kmem_fini(osc_caches);
2892         ptlrpc_free_rq_pool(osc_rq_pool);
2893 }
2894
2895 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2896 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2897 MODULE_VERSION(LUSTRE_VERSION_STRING);
2898 MODULE_LICENSE("GPL");
2899
2900 module_init(osc_init);
2901 module_exit(osc_exit);